-
Notifications
You must be signed in to change notification settings - Fork 97
/
CommittableOffset.scala
141 lines (119 loc) · 4.44 KB
/
CommittableOffset.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
* Copyright 2018-2022 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/
package fs2.kafka
import cats.{ApplicativeError, Eq, Show}
import cats.instances.string._
import cats.syntax.show._
import fs2.kafka.instances._
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
/**
* [[CommittableOffset]] represents an [[offsetAndMetadata]] for a
* [[topicPartition]], along with the ability to commit that offset
* to Kafka with [[commit]]. Note that offsets are normally committed
* in batches for performance reasons. Pipes like [[commitBatchWithin]]
* use [[CommittableOffsetBatch]] to commit the offsets in batches.<br>
* <br>
* While normally not necessary, [[CommittableOffset#apply]] can be
* used to create a new instance.
*/
sealed abstract class CommittableOffset[F[_]] {
/**
* The topic and partition for which [[offsetAndMetadata]]
* can be committed using [[commit]].
*/
def topicPartition: TopicPartition
/**
* The offset and metadata for the [[topicPartition]], which
* can be committed using [[commit]].
*/
def offsetAndMetadata: OffsetAndMetadata
/**
* The consumer group ID of the consumer that fetched the
* [[offsetAndMetadata]] from the [[topicPartition]] from
* Kafka.<br>
* <br>
* Required for committing offsets within a transaction.
*/
def consumerGroupId: Option[String]
/**
* The [[topicPartition]] and [[offsetAndMetadata]] as a `Map`.
* This is provided for convenience and is always guaranteed to
* be equivalent to the following.
*
* {{{
* Map(topicPartition -> offsetAndMetadata)
* }}}
*/
def offsets: Map[TopicPartition, OffsetAndMetadata]
/**
* The [[CommittableOffset]] as a [[CommittableOffsetBatch]].
*/
def batch: CommittableOffsetBatch[F]
/**
* Commits the [[offsetAndMetadata]] for the [[topicPartition]] to
* Kafka. Note that offsets are normally committed in batches for
* performance reasons. Prefer pipes like [[commitBatchWithin]]
* or [[CommittableOffsetBatch]] for that reason.
*/
def commit: F[Unit]
/**
* The commit function we are using in [[commit]] to commit the
* [[offsetAndMetadata]] for the [[topicPartition]]. Is used to
* help achieve better performance when batching offsets.
*/
private[kafka] def commitOffsets: Map[TopicPartition, OffsetAndMetadata] => F[Unit]
}
object CommittableOffset {
/**
* Creates a new [[CommittableOffset]] with the specified `topicPartition`
* and `offsetAndMetadata`, along with `commit`, describing how to commit
* an arbitrary `Map` of topic-partition offsets.
*/
def apply[F[_]](
topicPartition: TopicPartition,
offsetAndMetadata: OffsetAndMetadata,
consumerGroupId: Option[String],
commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit]
)(implicit F: ApplicativeError[F, Throwable]): CommittableOffset[F] = {
val _topicPartition = topicPartition
val _offsetAndMetadata = offsetAndMetadata
val _consumerGroupId = consumerGroupId
val _commit = commit
new CommittableOffset[F] {
override val topicPartition: TopicPartition =
_topicPartition
override val offsetAndMetadata: OffsetAndMetadata =
_offsetAndMetadata
override val consumerGroupId: Option[String] =
_consumerGroupId
override def offsets: Map[TopicPartition, OffsetAndMetadata] =
Map(_topicPartition -> _offsetAndMetadata)
override def batch: CommittableOffsetBatch[F] =
CommittableOffsetBatch(offsets, consumerGroupId.toSet, consumerGroupId.isEmpty, _commit)
override def commit: F[Unit] =
_commit(offsets)
override val commitOffsets: Map[TopicPartition, OffsetAndMetadata] => F[Unit] =
_commit
override def toString: String =
consumerGroupId match {
case Some(consumerGroupId) =>
show"CommittableOffset($topicPartition -> $offsetAndMetadata, $consumerGroupId)"
case None =>
show"CommittableOffset($topicPartition -> $offsetAndMetadata)"
}
}
}
implicit def committableOffsetShow[F[_]]: Show[CommittableOffset[F]] =
Show.fromToString
implicit def committableOffsetEq[F[_]]: Eq[CommittableOffset[F]] =
Eq.instance {
case (l, r) =>
l.topicPartition == r.topicPartition &&
l.offsetAndMetadata == r.offsetAndMetadata &&
l.consumerGroupId == r.consumerGroupId
}
}