/
ProducerResult.scala
92 lines (83 loc) · 2.58 KB
/
ProducerResult.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/*
* Copyright 2018-2022 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/
package fs2.kafka
import cats.syntax.show._
import cats.Show
import fs2.Chunk
import fs2.kafka.instances._
import fs2.kafka.internal.syntax._
import org.apache.kafka.clients.producer.RecordMetadata
/**
* [[ProducerResult]] represents the result of having produced zero
* or more `ProducerRecord`s from a [[ProducerRecords]]. Finally, a
* passthrough value and `ProducerRecord`s along with respective
* `RecordMetadata` are emitted in a [[ProducerResult]].<br>
* <br>
* The [[passthrough]] and [[records]] can be retrieved from an
* existing [[ProducerResult]] instance.<br>
* <br>
* Use [[ProducerResult#apply]] to create a new [[ProducerResult]].
*/
sealed abstract class ProducerResult[+P, +K, +V] {
/**
* The records produced along with respective metadata.
* Can be empty for passthrough-only.
*/
def records: Chunk[(ProducerRecord[K, V], RecordMetadata)]
/** The passthrough value. */
def passthrough: P
}
object ProducerResult {
private[this] final class ProducerResultImpl[+P, +K, +V](
override val records: Chunk[(ProducerRecord[K, V], RecordMetadata)],
override val passthrough: P
) extends ProducerResult[P, K, V] {
override def toString: String =
if (records.isEmpty)
s"ProducerResult(<empty>, $passthrough)"
else
records.mkStringAppend {
case (append, (record, metadata)) =>
append(metadata.toString)
append(" -> ")
append(record.toString)
}(
start = "ProducerResult(",
sep = ", ",
end = s", $passthrough)"
)
}
/**
* Creates a new [[ProducerResult]] for having produced zero
* or more `ProducerRecord`s, finally emitting a passthrough
* value and the `ProducerRecord`s with `RecordMetadata`.
*/
def apply[P, K, V](
records: Chunk[(ProducerRecord[K, V], RecordMetadata)],
passthrough: P
): ProducerResult[P, K, V] =
new ProducerResultImpl(records, passthrough)
implicit def producerResultShow[P, K, V](
implicit
K: Show[K],
V: Show[V],
P: Show[P]
): Show[ProducerResult[P, K, V]] = Show.show { result =>
if (result.records.isEmpty)
show"ProducerResult(<empty>, ${result.passthrough})"
else
result.records.mkStringAppend {
case (append, (record, metadata)) =>
append(metadata.show)
append(" -> ")
append(record.show)
}(
start = "ProducerResult(",
sep = ", ",
end = show", ${result.passthrough})"
)
}
}