-
Notifications
You must be signed in to change notification settings - Fork 28k
/
KafkaOffsetReader.scala
445 lines (403 loc) · 17.4 KB
/
KafkaOffsetReader.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import java.{time => jt}
import java.{util => ju}
import java.util.concurrent.Executors
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.sql.types._
import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
/**
* This class uses Kafka's own [[KafkaConsumer]] API to read data offsets from Kafka.
* The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read
* by this source. These strategies directly correspond to the different consumption options
* in. This class is designed to return a configured [[KafkaConsumer]] that is used by the
* [[KafkaSource]] to query for the offsets. See the docs on
* [[org.apache.spark.sql.kafka010.ConsumerStrategy]]
* for more details.
*
* Note: This class is not ThreadSafe
*/
private[kafka010] class KafkaOffsetReader(
consumerStrategy: ConsumerStrategy,
val driverKafkaParams: ju.Map[String, Object],
readerOptions: Map[String, String],
driverGroupIdPrefix: String) extends Logging {
private val pollTimeoutMs = readerOptions.getOrElse(
KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
(SparkEnv.get.conf.get(NETWORK_TIMEOUT) * 1000L).toString
).toLong
/**
* Used to ensure execute fetch operations execute in an UninterruptibleThread
*/
val kafkaReaderThread = Executors.newSingleThreadExecutor((r: Runnable) => {
val t = new UninterruptibleThread("Kafka Offset Reader") {
override def run(): Unit = {
r.run()
}
}
t.setDaemon(true)
t
})
val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread)
/**
* Place [[groupId]] and [[nextId]] here so that they are initialized before any consumer is
* created -- see SPARK-19564.
*/
private var groupId: String = null
private var nextId = 0
/**
* A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
* offsets and never commits them.
*/
@volatile protected var _consumer: Consumer[Array[Byte], Array[Byte]] = null
protected def consumer: Consumer[Array[Byte], Array[Byte]] = synchronized {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
if (_consumer == null) {
val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
if (driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
}
_consumer = consumerStrategy.createConsumer(newKafkaParams)
}
_consumer
}
private val maxOffsetFetchAttempts =
readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, "3").toInt
private val offsetFetchAttemptIntervalMs =
readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, "1000").toLong
private def nextGroupId(): String = {
groupId = driverGroupIdPrefix + "-" + nextId
nextId += 1
groupId
}
override def toString(): String = consumerStrategy.toString
/**
* Closes the connection to Kafka, and cleans up state.
*/
def close(): Unit = {
if (_consumer != null) runUninterruptibly { stopConsumer() }
kafkaReaderThread.shutdown()
}
/**
* @return The Set of TopicPartitions for a given topic
*/
def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
val partitions = getPartitions()
consumer.pause(partitions)
partitions.asScala.toSet
}
/**
* Fetch the partition offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]] and [[KafkaOffsetRangeLimit]].
*/
def fetchPartitionOffsets(offsetRangeLimit: KafkaOffsetRangeLimit): Map[TopicPartition, Long] = {
def validateTopicPartitions(partitions: Set[TopicPartition],
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
assert(partitions == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
"Use -1 for latest, -2 for earliest, if you don't care.\n" +
s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions}")
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")
partitionOffsets
}
val partitions = fetchTopicPartitions()
// Obtain TopicPartition offsets with late binding support
offsetRangeLimit match {
case EarliestOffsetRangeLimit => partitions.map {
case tp => tp -> KafkaOffsetRangeLimit.EARLIEST
}.toMap
case LatestOffsetRangeLimit => partitions.map {
case tp => tp -> KafkaOffsetRangeLimit.LATEST
}.toMap
case SpecificOffsetRangeLimit(partitionOffsets) =>
validateTopicPartitions(partitions, partitionOffsets)
}
}
/**
* Resolves the specific offsets based on Kafka seek positions.
* This method resolves offset value -1 to the latest and -2 to the
* earliest Kafka seek position.
*
* @param partitionOffsets the specific offsets to resolve
* @param reportDataLoss callback to either report or log data loss depending on setting
*/
def fetchSpecificOffsets(
partitionOffsets: Map[TopicPartition, Long],
reportDataLoss: String => Unit): KafkaSourceOffset = {
val fetched = runUninterruptibly {
withRetriesWithoutInterrupt {
val partitions = getPartitions()
// Call `position` to wait until the potential offset request triggered by `poll(0)` is
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
// `poll(0)` may reset offsets that should have been set by another request.
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
consumer.pause(partitions)
assert(partitions.asScala == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
"Use -1 for latest, -2 for earliest, if you don't care.\n" +
s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
logDebug(s"Seeking to $partitionOffsets")
partitionOffsets.foreach {
case (tp, KafkaOffsetRangeLimit.LATEST) =>
consumer.seekToEnd(ju.Arrays.asList(tp))
case (tp, KafkaOffsetRangeLimit.EARLIEST) =>
consumer.seekToBeginning(ju.Arrays.asList(tp))
case (tp, off) => consumer.seek(tp, off)
}
partitionOffsets.map {
case (tp, _) => tp -> consumer.position(tp)
}
}
}
partitionOffsets.foreach {
case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
off != KafkaOffsetRangeLimit.EARLIEST =>
if (fetched(tp) != off) {
reportDataLoss(
s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}")
}
case _ =>
// no real way to check that beginning or end is reasonable
}
KafkaSourceOffset(fetched)
}
/**
* Fetch the earliest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
*/
def fetchEarliestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
withRetriesWithoutInterrupt {
val partitions = getPartitions()
consumer.pause(partitions)
logDebug(s"Seeking to the beginning")
consumer.seekToBeginning(partitions)
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got earliest offsets for partition : $partitionOffsets")
partitionOffsets
}
}
/**
* Fetch the latest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
*
* Kafka may return earliest offsets when we are requesting latest offsets if `poll` is called
* right before `seekToEnd` (KAFKA-7703). As a workaround, we will call `position` right after
* `poll` to wait until the potential offset request triggered by `poll(0)` is done.
*
* In addition, to avoid other unknown issues, we also use the given `knownOffsets` to audit the
* latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less
* than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. When
* a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We cannot
* distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying.
*/
def fetchLatestOffsets(
knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly {
withRetriesWithoutInterrupt {
val partitions = getPartitions()
// Call `position` to wait until the potential offset request triggered by `poll(0)` is
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
// `poll(0)` may reset offsets that should have been set by another request.
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
consumer.pause(partitions)
logDebug(s"Seeking to the end.")
if (knownOffsets.isEmpty) {
consumer.seekToEnd(partitions)
partitions.asScala.map(p => p -> consumer.position(p)).toMap
} else {
var partitionOffsets: PartitionOffsetMap = Map.empty
/**
* Compare `knownOffsets` and `partitionOffsets`. Returns all partitions that have incorrect
* latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`).
*/
def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = {
var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
partitionOffsets.foreach { case (tp, offset) =>
knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
if (knownOffset > offset) {
val incorrectOffset = (tp, knownOffset, offset)
incorrectOffsets += incorrectOffset
}
})
}
incorrectOffsets
}
// Retry to fetch latest offsets when detecting incorrect offsets. We don't use
// `withRetriesWithoutInterrupt` to retry because:
//
// - `withRetriesWithoutInterrupt` will reset the consumer for each attempt but a fresh
// consumer has a much bigger chance to hit KAFKA-7703.
// - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
var attempt = 0
do {
consumer.seekToEnd(partitions)
partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
attempt += 1
incorrectOffsets = findIncorrectOffsets()
if (incorrectOffsets.nonEmpty) {
logWarning("Found incorrect offsets in some partitions " +
s"(partition, previous offset, fetched offset): $incorrectOffsets")
if (attempt < maxOffsetFetchAttempts) {
logWarning("Retrying to fetch latest offsets because of incorrect offsets")
Thread.sleep(offsetFetchAttemptIntervalMs)
}
}
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
logDebug(s"Got latest offsets for partition : $partitionOffsets")
partitionOffsets
}
}
}
/**
* Fetch the earliest offsets for specific topic partitions.
* The return result may not contain some partitions if they are deleted.
*/
def fetchEarliestOffsets(
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = {
if (newPartitions.isEmpty) {
Map.empty[TopicPartition, Long]
} else {
runUninterruptibly {
withRetriesWithoutInterrupt {
val partitions = getPartitions()
consumer.pause(partitions)
// Get the earliest offset of each partition
consumer.seekToBeginning(partitions)
val partitionOffsets = newPartitions.filter { p =>
// When deleting topics happen at the same time, some partitions may not be in
// `partitions`. So we need to ignore them
partitions.contains(p)
}.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
partitionOffsets
}
}
}
}
/**
* This method ensures that the closure is called in an [[UninterruptibleThread]].
* This is required when communicating with the [[KafkaConsumer]]. In the case
* of streaming queries, we are already running in an [[UninterruptibleThread]],
* however for batch mode this is not the case.
*/
private def runUninterruptibly[T](body: => T): T = {
if (!Thread.currentThread.isInstanceOf[UninterruptibleThread]) {
val future = Future {
body
}(execContext)
ThreadUtils.awaitResult(future, Duration.Inf)
} else {
body
}
}
/**
* Helper function that does multiple retries on a body of code that returns offsets.
* Retries are needed to handle transient failures. For e.g. race conditions between getting
* assignment and getting position while topics/partitions are deleted can cause NPEs.
*
* This method also makes sure `body` won't be interrupted to workaround a potential issue in
* `KafkaConsumer.poll`. (KAFKA-1894)
*/
private def withRetriesWithoutInterrupt(
body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
synchronized {
var result: Option[Map[TopicPartition, Long]] = None
var attempt = 1
var lastException: Throwable = null
while (result.isEmpty && attempt <= maxOffsetFetchAttempts
&& !Thread.currentThread().isInterrupted) {
Thread.currentThread match {
case ut: UninterruptibleThread =>
// "KafkaConsumer.poll" may hang forever if the thread is interrupted (E.g., the query
// is stopped)(KAFKA-1894). Hence, we just make sure we don't interrupt it.
//
// If the broker addresses are wrong, or Kafka cluster is down, "KafkaConsumer.poll" may
// hang forever as well. This cannot be resolved in KafkaSource until Kafka fixes the
// issue.
ut.runUninterruptibly {
try {
result = Some(body)
} catch {
case NonFatal(e) =>
lastException = e
logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e)
attempt += 1
Thread.sleep(offsetFetchAttemptIntervalMs)
resetConsumer()
}
}
case _ =>
throw new IllegalStateException(
"Kafka APIs must be executed on a o.a.spark.util.UninterruptibleThread")
}
}
if (Thread.interrupted()) {
throw new InterruptedException()
}
if (result.isEmpty) {
assert(attempt > maxOffsetFetchAttempts)
assert(lastException != null)
throw lastException
}
result.get
}
}
private def stopConsumer(): Unit = synchronized {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
if (_consumer != null) _consumer.close()
}
private def resetConsumer(): Unit = synchronized {
stopConsumer()
_consumer = null // will automatically get reinitialized again
}
private def getPartitions(): ju.Set[TopicPartition] = {
var partitions = Set.empty[TopicPartition].asJava
val startTimeMs = System.currentTimeMillis()
while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) {
// Poll to get the latest assigned partitions
consumer.poll(jt.Duration.ZERO)
partitions = consumer.assignment()
}
require(!partitions.isEmpty)
logDebug(s"Partitions assigned to consumer: $partitions")
partitions
}
}
private[kafka010] object KafkaOffsetReader {
def kafkaSchema: StructType = StructType(Seq(
StructField("key", BinaryType),
StructField("value", BinaryType),
StructField("topic", StringType),
StructField("partition", IntegerType),
StructField("offset", LongType),
StructField("timestamp", TimestampType),
StructField("timestampType", IntegerType)
))
}