-
Notifications
You must be signed in to change notification settings - Fork 97
/
package.scala
124 lines (94 loc) · 3.95 KB
/
package.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/*
* Copyright 2018-2023 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/
package fs2
import cats.Traverse
import cats.effect._
import scala.concurrent.duration.FiniteDuration
import org.apache.kafka.clients.producer.RecordMetadata
package object kafka {
type Id[+A] = A
/** Alias for Java Kafka `Consumer[Array[Byte], Array[Byte]]`. */
type KafkaByteConsumer =
org.apache.kafka.clients.consumer.Consumer[Array[Byte], Array[Byte]]
/** Alias for Java Kafka `Producer[Array[Byte], Array[Byte]]`. */
type KafkaByteProducer =
org.apache.kafka.clients.producer.Producer[Array[Byte], Array[Byte]]
/** Alias for Java Kafka `Deserializer[A]`. */
type KafkaDeserializer[A] =
org.apache.kafka.common.serialization.Deserializer[A]
/** Alias for Java Kafka `Serializer[A]`. */
type KafkaSerializer[A] =
org.apache.kafka.common.serialization.Serializer[A]
/** Alias for Java Kafka `Header`. */
type KafkaHeader =
org.apache.kafka.common.header.Header
/** Alias for Java Kafka `Headers`. */
type KafkaHeaders =
org.apache.kafka.common.header.Headers
/** Alias for Java Kafka `ConsumerRecords[Array[Byte], Array[Byte]]`. */
type KafkaByteConsumerRecords =
org.apache.kafka.clients.consumer.ConsumerRecords[Array[Byte], Array[Byte]]
/** Alias for Java Kafka `ConsumerRecord[Array[Byte], Array[Byte]]`. */
type KafkaByteConsumerRecord =
org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte], Array[Byte]]
/** Alias for Java Kafka `ProducerRecord[Array[Byte], Array[Byte]]`. */
type KafkaByteProducerRecord =
org.apache.kafka.clients.producer.ProducerRecord[Array[Byte], Array[Byte]]
type ProducerRecords[K, V] = Chunk[ProducerRecord[K, V]]
type TransactionalProducerRecords[F[_], +K, +V] = Chunk[CommittableProducerRecords[F, K, V]]
type ProducerResult[K, V] = Chunk[(ProducerRecord[K, V], RecordMetadata)]
/**
* Commits offsets in batches of every `n` offsets or time window
* of length `d`, whichever happens first. If there are no offsets
* to commit within a time window, no attempt will be made to commit
* offsets for that time window.
*/
def commitBatchWithin[F[_]](n: Int, d: FiniteDuration)(
implicit F: Temporal[F]
): Pipe[F, CommittableOffset[F], Unit] =
_.groupWithin(n, d).evalMap(CommittableOffsetBatch.fromFoldable(_).commit)
type Serializer[F[_], A] = GenericSerializer[KeyOrValue, F, A]
type KeySerializer[F[_], A] = GenericSerializer[Key, F, A]
type ValueSerializer[F[_], A] = GenericSerializer[Value, F, A]
val Serializer: GenericSerializer.type = GenericSerializer
type Deserializer[F[_], A] = GenericDeserializer[KeyOrValue, F, A]
type KeyDeserializer[F[_], A] = GenericDeserializer[Key, F, A]
type ValueDeserializer[F[_], A] = GenericDeserializer[Value, F, A]
val Deserializer: GenericDeserializer.type = GenericDeserializer
}
package kafka {
/** Phantom types to indicate whether a [[Serializer]]/[[Deserializer]] if for keys, values, or both
*/
sealed trait KeyOrValue
sealed trait Key extends KeyOrValue
sealed trait Value extends KeyOrValue
}
package kafka {
import cats.Foldable
object ProducerRecords {
def apply[F[+_], K, V](
records: F[ProducerRecord[K, V]]
)(
implicit F: Traverse[F]
): ProducerRecords[K, V] = Chunk.iterable(Foldable[F].toIterable(records))
def one[K, V](record: ProducerRecord[K, V]): ProducerRecords[K, V] =
Chunk.singleton(record)
}
object TransactionalProducerRecords {
@deprecated("this is now an identity operation", "3.0.0-M5")
def apply[F[_], K, V](
chunk: Chunk[CommittableProducerRecords[F, K, V]]
): Chunk[CommittableProducerRecords[F, K, V]] = chunk
/**
* Creates a new [[TransactionalProducerRecords]] for producing exactly
* one [[CommittableProducerRecords]]
*/
def one[F[_], K, V](
record: CommittableProducerRecords[F, K, V]
): TransactionalProducerRecords[F, K, V] =
Chunk.singleton(record)
}
}