/
ProducerRecords.scala
130 lines (117 loc) · 4.2 KB
/
ProducerRecords.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
* Copyright 2018-2022 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/
package fs2.kafka
import cats.{Foldable, Show, Traverse}
import cats.syntax.show._
import fs2.Chunk
import fs2.kafka.internal.syntax._
/**
* [[ProducerRecords]] represents zero or more `ProducerRecord`s,
* together with an arbitrary passthrough value, all of which can
* be used with [[KafkaProducer]]. [[ProducerRecords]]s can be
* created using one of the following options.<br>
* <br>
* - `ProducerRecords#apply` to produce zero or more records
* and then emit a [[ProducerResult]] with the results and
* specified passthrough value.<br>
* - `ProducerRecords#one` to produce exactly one record and
* then emit a [[ProducerResult]] with the result and specified
* passthrough value.<br>
* <br>
* The [[passthrough]] and [[records]] can be retrieved from an
* existing [[ProducerRecords]] instance.<br>
*/
sealed abstract class ProducerRecords[+P, +K, +V] {
/** The records to produce. Can be empty for passthrough-only. */
def records: Chunk[ProducerRecord[K, V]]
/** The passthrough to emit once all [[records]] have been produced. */
def passthrough: P
}
object ProducerRecords {
private[this] final class ProducerRecordsImpl[+P, +K, +V](
override val records: Chunk[ProducerRecord[K, V]],
override val passthrough: P
) extends ProducerRecords[P, K, V] {
override def toString: String =
if (records.isEmpty) s"ProducerRecords(<empty>, $passthrough)"
else records.mkString("ProducerRecords(", ", ", s", $passthrough)")
}
/**
* Creates a new [[ProducerRecords]] for producing zero or more
* `ProducerRecords`s, then emitting a [[ProducerResult]] with
* the results and `Unit` passthrough value.
*
* @see [[fs2.kafka.ProducerRecords#chunk(fs2.Chunk)]] if your `records` are already contained in an [[fs2.Chunk]]
*/
def apply[F[+_], K, V](
records: F[ProducerRecord[K, V]]
)(
implicit F: Traverse[F]
): ProducerRecords[Unit, K, V] =
apply(records, ())
/**
* Creates a new [[ProducerRecords]] for producing zero or more
* `ProducerRecords`s, then emitting a [[ProducerResult]] with
* the results and specified passthrough value.
*
* @see [[fs2.kafka.ProducerRecords#chunk(fs2.Chunk, java.lang.Object)]] if your `records` are already contained in
* an [[fs2.Chunk]]
*/
def apply[F[+_], P, K, V](
records: F[ProducerRecord[K, V]],
passthrough: P
)(
implicit F: Traverse[F]
): ProducerRecords[P, K, V] =
chunk(Chunk.iterable(Foldable[F].toIterable(records)), passthrough)
/**
* Creates a new [[ProducerRecords]] for producing exactly one
* `ProducerRecord`, then emitting a [[ProducerResult]] with
* the result and `Unit` passthrough value.
*/
def one[K, V](
record: ProducerRecord[K, V]
): ProducerRecords[Unit, K, V] =
one(record, ())
/**
* Creates a new [[ProducerRecords]] for producing exactly one
* `ProducerRecord`, then emitting a [[ProducerResult]] with
* the result and specified passthrough value.
*/
def one[P, K, V](
record: ProducerRecord[K, V],
passthrough: P
): ProducerRecords[P, K, V] =
apply(Chunk.singleton(record), passthrough)
/**
* Creates a new [[ProducerRecords]] for producing zero or more
* `ProducerRecords`s, then emitting a [[ProducerResult]] with
* the results and `Unit` passthrough value.
*/
def chunk[K, V](
records: Chunk[ProducerRecord[K, V]]
): ProducerRecords[Unit, K, V] =
chunk(records, ())
/**
* Creates a new [[ProducerRecords]] for producing zero or more
* `ProducerRecords`s, then emitting a [[ProducerResult]] with
* the results and specified passthrough value.
*/
def chunk[P, K, V](
records: Chunk[ProducerRecord[K, V]],
passthrough: P
): ProducerRecords[P, K, V] =
new ProducerRecordsImpl(records, passthrough)
implicit def producerRecordsShow[P, K, V](
implicit
K: Show[K],
V: Show[V],
P: Show[P]
): Show[ProducerRecords[P, K, V]] = Show.show { records =>
if (records.records.isEmpty) show"ProducerRecords(<empty>, ${records.passthrough})"
else records.records.mkStringShow("ProducerRecords(", ", ", s", ${records.passthrough})")
}
}