From 3d199eb3f9ec66458ad4669d8ac4d465086431d7 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 11 Oct 2017 15:04:28 -0700 Subject: [PATCH 1/3] [BEAM-2979] Fix a race condition in getWatermark() in KafkaIO. Two fixes : - Don't set curRecord to null before updating. If user deserializers throw, ok to keep curRecord pointing to old one. - use atomic references for curRecord and curTimestamp since these are accessed in getWatermark() and getCurrentTimestamp(). These methods could be called concurrently with advance(). --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index af73a8d4f8e7..d3356d81b3ad 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -66,6 +66,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AtomicCoder; @@ -898,8 +899,9 @@ private static class UnboundedKafkaReader extends UnboundedReader consumer; private final List partitionStates; - private KafkaRecord curRecord; - private Instant curTimestamp; + // curRecord and curTimestamp are accessed outside advance(), which might be another thread. + private AtomicReference> curRecord = new AtomicReference<>(); + private AtomicReference curTimestamp = new AtomicReference<>(); private Iterator curBatch = Collections.emptyIterator(); private Deserializer keyDeserializerInstance = null; @@ -1248,18 +1250,10 @@ public boolean advance() throws IOException { continue; } - long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled. - - if (curRecord == null) { - LOG.info("{}: first record offset {}", name, offset); - offsetGap = 0; - } - - curRecord = null; // user coders below might throw. - - // apply user deserializers. + // Apply user deserializers. User deserializers might throw, which will be propagated up. + // 'curRecord' remains unchanged. The runner should close this reader. // TODO: write records that can't be deserialized to a "dead-letter" additional output. - KafkaRecord record = new KafkaRecord( + KafkaRecord record = new KafkaRecord<>( rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), @@ -1267,10 +1261,19 @@ public boolean advance() throws IOException { keyDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.key()), valueDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.value())); - curTimestamp = (source.spec.getTimestampFn() == null) - ? Instant.now() : source.spec.getTimestampFn().apply(record); - curRecord = record; + curTimestamp.set((source.spec.getTimestampFn() == null) + ? Instant.now() : source.spec.getTimestampFn().apply(record)); + + KafkaRecord prevRecord = curRecord.getAndSet(record); + long offsetGap; + if (prevRecord == null) { + LOG.info("{}: first record offset {}", name, offset); + offsetGap = 0; + } else { + offsetGap = offset - expected; + // could be > 0 when Kafka log compaction is enabled or when trasactions are aborted. + } int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); pState.recordConsumed(offset, recordSize, offsetGap); @@ -1331,13 +1334,14 @@ private void reportBacklog() { @Override public Instant getWatermark() { - if (curRecord == null) { + KafkaRecord record = curRecord.get(); + if (record == null) { LOG.debug("{}: getWatermark() : no records have been read yet.", name); return initialWatermark; } return source.spec.getWatermarkFn() != null - ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp; + ? source.spec.getWatermarkFn().apply(record) : curTimestamp.get(); } @Override @@ -1364,12 +1368,12 @@ public PartitionMark apply(PartitionState p) { @Override public KafkaRecord getCurrent() throws NoSuchElementException { // should we delay updating consumed offset till this point? Mostly not required. - return curRecord; + return curRecord.get(); } @Override public Instant getCurrentTimestamp() throws NoSuchElementException { - return curTimestamp; + return curTimestamp.get(); } From 31a57bcd64a4edebb085eb22eb4ec9296fccfce0 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Fri, 13 Oct 2017 15:20:13 -0700 Subject: [PATCH 2/3] Guard curTimestamp and curRecord with this and upate them together. --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 47 ++++++++++++------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index d3356d81b3ad..9ba66af74aa8 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -66,8 +66,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.AvroCoder; @@ -899,9 +899,10 @@ private static class UnboundedKafkaReader extends UnboundedReader consumer; private final List partitionStates; - // curRecord and curTimestamp are accessed outside advance(), which might be another thread. - private AtomicReference> curRecord = new AtomicReference<>(); - private AtomicReference curTimestamp = new AtomicReference<>(); + @GuardedBy("this") + private KafkaRecord curRecord = null; + @GuardedBy("this") + private Instant curTimestamp = null; private Iterator curBatch = Collections.emptyIterator(); private Deserializer keyDeserializerInstance = null; @@ -1261,19 +1262,26 @@ public boolean advance() throws IOException { keyDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.key()), valueDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.value())); - curTimestamp.set((source.spec.getTimestampFn() == null) - ? Instant.now() : source.spec.getTimestampFn().apply(record)); + Instant timestamp = (source.spec.getTimestampFn() == null) + ? Instant.now() : source.spec.getTimestampFn().apply(record); - KafkaRecord prevRecord = curRecord.getAndSet(record); + // Update curRecord and curTimestamp under lock. + boolean isFirstRecord; + synchronized (this) { + isFirstRecord = curRecord == null; + curRecord = record; + curTimestamp = timestamp; + } long offsetGap; - if (prevRecord == null) { + if (isFirstRecord) { LOG.info("{}: first record offset {}", name, offset); offsetGap = 0; } else { - offsetGap = offset - expected; - // could be > 0 when Kafka log compaction is enabled or when trasactions are aborted. + offsetGap = offset - expected; + // Gap could be due to log compaction or aborted transactions. } + int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); pState.recordConsumed(offset, recordSize, offsetGap); @@ -1334,14 +1342,20 @@ private void reportBacklog() { @Override public Instant getWatermark() { - KafkaRecord record = curRecord.get(); + KafkaRecord record; + Instant timestamp; + synchronized (this) { + record = curRecord; + timestamp = curTimestamp; + } + if (record == null) { LOG.debug("{}: getWatermark() : no records have been read yet.", name); return initialWatermark; } return source.spec.getWatermarkFn() != null - ? source.spec.getWatermarkFn().apply(record) : curTimestamp.get(); + ? source.spec.getWatermarkFn().apply(record) : timestamp; } @Override @@ -1366,17 +1380,16 @@ public PartitionMark apply(PartitionState p) { } @Override - public KafkaRecord getCurrent() throws NoSuchElementException { + public synchronized KafkaRecord getCurrent() throws NoSuchElementException { // should we delay updating consumed offset till this point? Mostly not required. - return curRecord.get(); + return curRecord; } @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return curTimestamp.get(); + public synchronized Instant getCurrentTimestamp() throws NoSuchElementException { + return curTimestamp; } - @Override public long getSplitBacklogBytes() { long backlogBytes = 0; From aec694e3f1724f5c4adc4b42c6df7ddda6b0e0a3 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Fri, 20 Oct 2017 10:58:18 -0700 Subject: [PATCH 3/3] Avoid setting curRecord to null. --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 55 ++++++------------- 1 file changed, 18 insertions(+), 37 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 9ba66af74aa8..17e0e346c43b 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -67,7 +67,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.AvroCoder; @@ -899,10 +898,8 @@ private static class UnboundedKafkaReader extends UnboundedReader consumer; private final List partitionStates; - @GuardedBy("this") - private KafkaRecord curRecord = null; - @GuardedBy("this") - private Instant curTimestamp = null; + private KafkaRecord curRecord; + private Instant curTimestamp; private Iterator curBatch = Collections.emptyIterator(); private Deserializer keyDeserializerInstance = null; @@ -1251,8 +1248,15 @@ public boolean advance() throws IOException { continue; } - // Apply user deserializers. User deserializers might throw, which will be propagated up. - // 'curRecord' remains unchanged. The runner should close this reader. + long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled. + + if (curRecord == null) { + LOG.info("{}: first record offset {}", name, offset); + offsetGap = 0; + } + + // Apply user deserializers. User deserializers might throw, which will be propagated up + // and 'curRecord' remains unchanged. The runner should close this reader. // TODO: write records that can't be deserialized to a "dead-letter" additional output. KafkaRecord record = new KafkaRecord<>( rawRecord.topic(), @@ -1262,25 +1266,9 @@ public boolean advance() throws IOException { keyDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.key()), valueDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.value())); - Instant timestamp = (source.spec.getTimestampFn() == null) - ? Instant.now() : source.spec.getTimestampFn().apply(record); - - // Update curRecord and curTimestamp under lock. - boolean isFirstRecord; - synchronized (this) { - isFirstRecord = curRecord == null; - curRecord = record; - curTimestamp = timestamp; - } - - long offsetGap; - if (isFirstRecord) { - LOG.info("{}: first record offset {}", name, offset); - offsetGap = 0; - } else { - offsetGap = offset - expected; - // Gap could be due to log compaction or aborted transactions. - } + curTimestamp = (source.spec.getTimestampFn() == null) + ? Instant.now() : source.spec.getTimestampFn().apply(record); + curRecord = record; int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); @@ -1342,20 +1330,13 @@ private void reportBacklog() { @Override public Instant getWatermark() { - KafkaRecord record; - Instant timestamp; - synchronized (this) { - record = curRecord; - timestamp = curTimestamp; - } - - if (record == null) { + if (curRecord == null) { LOG.debug("{}: getWatermark() : no records have been read yet.", name); return initialWatermark; } return source.spec.getWatermarkFn() != null - ? source.spec.getWatermarkFn().apply(record) : timestamp; + ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp; } @Override @@ -1380,13 +1361,13 @@ public PartitionMark apply(PartitionState p) { } @Override - public synchronized KafkaRecord getCurrent() throws NoSuchElementException { + public KafkaRecord getCurrent() throws NoSuchElementException { // should we delay updating consumed offset till this point? Mostly not required. return curRecord; } @Override - public synchronized Instant getCurrentTimestamp() throws NoSuchElementException { + public Instant getCurrentTimestamp() throws NoSuchElementException { return curTimestamp; }