/
CommittableConsumerRecord.scala
134 lines (118 loc) · 4.6 KB
/
CommittableConsumerRecord.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
* Copyright 2018-2022 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/
package fs2.kafka
import cats.syntax.bifoldable._
import cats.syntax.bitraverse._
import cats.syntax.foldable._
import cats.syntax.functor._
import cats.syntax.show._
import cats.syntax.eq._
import cats.syntax.traverse._
import cats.{Applicative, Bitraverse, Eq, Eval, Show, Traverse}
/**
* [[CommittableConsumerRecord]] is a Kafka record along with an
* instance of [[CommittableOffset]], which can be used commit
* the record offset to Kafka. Offsets are normally committed in
* batches, either using [[CommittableOffsetBatch]] or via pipes,
* like [[commitBatchWithin]]. If you are not committing offsets
* to Kafka then you can use [[record]] to get the underlying
* record and also discard the [[offset]].<br>
* <br>
* While normally not necessary, [[CommittableConsumerRecord#apply]]
* can be used to create a new instance.
*/
sealed abstract class CommittableConsumerRecord[F[_], +K, +V] {
/**
* The Kafka record for the [[CommittableConsumerRecord]]. If you
* are not committing offsets to Kafka, simply use this to get the
* [[ConsumerRecord]] and discard the [[offset]].
*/
def record: ConsumerRecord[K, V]
/**
* A [[CommittableOffset]] instance, providing a way to commit the
* [[record]] offset to Kafka. This is normally done in batches as
* it achieves better performance. Pipes like [[commitBatchWithin]]
* use [[CommittableOffsetBatch]] to batch and commit offsets.
*/
def offset: CommittableOffset[F]
}
object CommittableConsumerRecord {
private[this] final class CommittableConsumerRecordImpl[F[_], +K, +V](
override val record: ConsumerRecord[K, V],
override val offset: CommittableOffset[F]
) extends CommittableConsumerRecord[F, K, V] {
override def toString: String =
s"CommittableConsumerRecord($record, $offset)"
}
/**
* Creates a new [[CommittableConsumerRecord]] using the specified
* Kafka [[ConsumerRecord]] and [[CommittableOffset]], which can
* be used to commit the record offset to Kafka.
*/
def apply[F[_], K, V](
record: ConsumerRecord[K, V],
offset: CommittableOffset[F]
): CommittableConsumerRecord[F, K, V] =
new CommittableConsumerRecordImpl(record, offset)
def unapply[F[_], K, V](
committable: CommittableConsumerRecord[F, K, V]
): Some[(ConsumerRecord[K, V], CommittableOffset[F])] =
Some((committable.record, committable.offset))
implicit def committableConsumerRecordShow[F[_], K, V](
implicit
K: Show[K],
V: Show[V]
): Show[CommittableConsumerRecord[F, K, V]] = Show.show { cm =>
show"CommittableConsumerRecord(${cm.record}, ${cm.offset})"
}
implicit def committableConsumerRecordEq[F[_], K: Eq, V: Eq]
: Eq[CommittableConsumerRecord[F, K, V]] =
Eq.instance {
case (l, r) =>
l.record === r.record && l.offset === r.offset
}
implicit def committableConsumerRecordBitraverse[F[_]]
: Bitraverse[CommittableConsumerRecord[F, *, *]] =
new Bitraverse[CommittableConsumerRecord[F, *, *]] {
override def bitraverse[G[_], A, B, C, D](
fab: CommittableConsumerRecord[F, A, B]
)(f: A => G[C], g: B => G[D])(
implicit G: Applicative[G]
): G[CommittableConsumerRecord[F, C, D]] =
fab.record.bitraverse(f, g).map { (cd: ConsumerRecord[C, D]) =>
CommittableConsumerRecord(cd, fab.offset)
}
override def bifoldLeft[A, B, C](
fab: CommittableConsumerRecord[F, A, B],
c: C
)(f: (C, A) => C, g: (C, B) => C): C =
fab.record.bifoldLeft(c)(f, g)
override def bifoldRight[A, B, C](
fab: CommittableConsumerRecord[F, A, B],
c: Eval[C]
)(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] =
fab.record.bifoldRight(c)(f, g)
}
implicit def committableConsumerRecordTraverse[F[_], K]
: Traverse[CommittableConsumerRecord[F, K, *]] =
new Traverse[CommittableConsumerRecord[F, K, *]] {
override def traverse[G[_], A, B](
fa: CommittableConsumerRecord[F, K, A]
)(f: A => G[B])(implicit G: Applicative[G]): G[CommittableConsumerRecord[F, K, B]] =
fa.record.traverse(f).map { (b: ConsumerRecord[K, B]) =>
CommittableConsumerRecord(b, fa.offset)
}
override def foldLeft[A, B](fa: CommittableConsumerRecord[F, K, A], b: B)(
f: (B, A) => B
): B =
fa.record.foldLeft(b)(f)
override def foldRight[A, B](
fa: CommittableConsumerRecord[F, K, A],
lb: Eval[B]
)(f: (A, Eval[B]) => Eval[B]): Eval[B] =
fa.record.foldRight(lb)(f)
}
}