Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,17 @@ public ProcessContinuation processElement(

@SuppressWarnings("rawtypes") // most straightforward way to create array with type parameter
UnboundedSourceValue<OutputT>[] out = new UnboundedSourceValue[1];
Instant claimLoopStart = Instant.now();
long claimLoopCount = 0;
LOG.debug("beam-io-read: entering tracker.tryClaim(out) loop");
while (tracker.tryClaim(out) && out[0] != null) {
claimLoopCount++;
watermarkEstimator.setWatermark(out[0].getWatermark());
receiver.outputWithTimestamp(
new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp());
}
LOG.debug("beam-io-read: tracker.tryClaim(out) looped {} times in {}",
claimLoopCount, new Duration(claimLoopStart, Instant.now()));

UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction =
tracker.currentRestriction();
Expand Down Expand Up @@ -895,6 +901,7 @@ public boolean tryClaim(@Nullable UnboundedSourceValue<OutputT> @ArrayLen(1) []
}
checkStateNotNull(currentReader, "currentReader null after initialization");
if (currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader) {
LOG.info("beam-io-read: reader changed into an EmptyUnboundedReader");
return false;
}
if (!readerHasBeenStarted) {
Expand All @@ -904,6 +911,7 @@ public boolean tryClaim(@Nullable UnboundedSourceValue<OutputT> @ArrayLen(1) []
return true;
}
} else if (!currentReader.advance()) {
LOG.info("beam-io-read: currentReader.advance() returned false");
position[0] = null;
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,52 +536,123 @@ Instant updateAndGetWatermark() {
backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId);
}

private static class PoolLoopStats {
long polls = 0;
Duration pollDuration = Duration.ZERO;
long successfulPolls = 0;
Duration successfulPollDuration = Duration.ZERO;
long offers = 0;
Duration offerDuration = Duration.ZERO;
long successfulOffers = 0;
Duration successfulOfferDuration = Duration.ZERO;
Duration checkpointDuration = Duration.ZERO;

public void pollComplete(Duration duration, boolean success) {
polls += 1;
if (success) {
successfulPolls += 1;
successfulPollDuration = successfulPollDuration.plus(duration);
}
pollDuration = pollDuration.plus(duration);
}

public void offerComplete(Duration duration, boolean success) {
offers += 1;
if (success) {
successfulOffers += 1;
successfulOfferDuration = successfulOfferDuration.plus(duration);
}
offerDuration = offerDuration.plus(duration);
}

public void checkpointComplete(Duration duration) {
checkpointDuration = checkpointDuration.plus(duration);
}

private Duration totalDuration() {
return pollDuration.plus(offerDuration).plus(checkpointDuration);
}

public void log() {
Duration avgSuccessfulPollDuration =
successfulPolls != 0 ? successfulPollDuration.dividedBy(successfulPolls) : Duration.ZERO;
Duration avgSuccessfulOfferDuration =
successfulOffers != 0 ? successfulOfferDuration.dividedBy(successfulOffers)
: Duration.ZERO;
LOG.debug(
"beam-io-unbounded-reader consumerPollLoop stats: polls -- {} total, {}ms total, " +
"{} ok, {}ms avg ok; offers -- {} total, {}ms total, {} ok, {}ms avg ok.",
polls, pollDuration.getMillis(), successfulPolls, avgSuccessfulPollDuration.getMillis(),
offers, offerDuration.getMillis(), successfulOffers,
avgSuccessfulOfferDuration.getMillis());
}
}

private void consumerPollLoop() {
LOG.debug("Starting consumerPollLoop()");
// Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue.
Consumer<byte[], byte[]> consumer = Preconditions.checkStateNotNull(this.consumer);

try {
PoolLoopStats stats = new PoolLoopStats();
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
while (!closed.get()) {
try {
if (records.isEmpty()) {
Instant pollStart = Instant.now();
records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
} else if (availableRecordsQueue.offer(
records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {
records = ConsumerRecords.empty();
stats.pollComplete(new Duration(pollStart, Instant.now()), !records.isEmpty());
} else {
Instant offerStart = Instant.now();
boolean offerOk =
availableRecordsQueue.offer(
records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
stats.offerComplete(new Duration(offerStart, Instant.now()), offerOk);
if (offerOk) {
records = ConsumerRecords.empty();
}
}
Instant checkpointStart = Instant.now();
if (commitCheckpointMark()) {
stats.checkpointComplete(new Duration(checkpointStart, Instant.now()));
}
// Flush stats once they accumulate 30s of measurements.
if (stats.totalDuration().getStandardSeconds() >= 30) {
stats.log();
stats = new PoolLoopStats();
}

commitCheckpointMark();
} catch (InterruptedException e) {
LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
break;
} catch (WakeupException e) {
break;
}
}
LOG.info("{}: Returning from consumer pool loop", this);
LOG.debug("{}: Returning from consumer pool loop", this);
stats.log();
} catch (Exception e) { // mostly an unrecoverable KafkaException.
LOG.error("{}: Exception while reading from Kafka", this, e);
consumerPollException.set(e);
throw e;
}
}

private void commitCheckpointMark() {
private boolean commitCheckpointMark() {
KafkaCheckpointMark checkpointMark = finalizedCheckpointMark.getAndSet(null);

if (checkpointMark != null) {
LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
Consumer<byte[], byte[]> consumer = Preconditions.checkStateNotNull(this.consumer);

consumer.commitSync(
checkpointMark.getPartitions().stream()
.filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
.collect(
Collectors.toMap(
p -> new TopicPartition(p.getTopic(), p.getPartition()),
p -> new OffsetAndMetadata(p.getNextOffset()))));
if (checkpointMark == null) {
return false;
}
LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
Consumer<byte[], byte[]> consumer = Preconditions.checkStateNotNull(this.consumer);

consumer.commitSync(
checkpointMark.getPartitions().stream()
.filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
.collect(
Collectors.toMap(
p -> new TopicPartition(p.getTopic(), p.getPartition()),
p -> new OffsetAndMetadata(p.getNextOffset()))));
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -324,6 +325,57 @@ public OffsetRangeTracker restrictionTracker(
return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
}

private static class PoolLoopStats {
long polls = 0;
long count = 0;
Duration pollDuration = Duration.ZERO;
long successfulPolls = 0;
Duration successfulPollDuration = Duration.ZERO;
long offers = 0;
Duration offerDuration = Duration.ZERO;
long successfulOffers = 0;
Duration successfulOfferDuration = Duration.ZERO;

public void pollComplete(Duration duration, boolean success, int count) {
polls += 1;
if (success) {
successfulPolls += 1;
this.count += count;
successfulPollDuration = successfulPollDuration.plus(duration);
}
pollDuration = pollDuration.plus(duration);
}

public void claimComplete(Duration duration, boolean success) {
offers += 1;
if (success) {
successfulOffers += 1;
successfulOfferDuration = successfulOfferDuration.plus(duration);
}
offerDuration = offerDuration.plus(duration);
}

private Duration totalDuration() {
return pollDuration.plus(offerDuration);
}

public void log(String prefix) {
Duration avgSuccessfulPollDuration =
successfulPolls != 0 ? successfulPollDuration.dividedBy(successfulPolls) : Duration.ZERO;
long countPerPoll = successfulPolls != 0 ? count / successfulPolls: 0;
Duration avgSuccessfulOfferDuration =
successfulOffers != 0 ? successfulOfferDuration.dividedBy(successfulOffers)
: Duration.ZERO;
LOG.info(
"{} consumerPollLoop stats: polls -- {} total, {}ms total, {} ok, {}ms avg ok; " +
"claim -- {} total, {}ms total, {} ok, {}ms avg ok; " +
"count -- {}, {} avg per ok poll",
prefix, polls, pollDuration.getMillis(), successfulPolls, avgSuccessfulPollDuration.getMillis(),
offers, offerDuration.getMillis(), successfulOffers,
avgSuccessfulOfferDuration.getMillis(), count, countPerPoll);
}
}

@ProcessElement
public ProcessContinuation processElement(
@Element KafkaSourceDescriptor kafkaSourceDescriptor,
Expand Down Expand Up @@ -377,20 +429,29 @@ public ProcessContinuation processElement(
consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();

LOG.info("beam-io-sdf: Enter pool loop");
PoolLoopStats stats = new PoolLoopStats();
while (true) {
Instant pollStart = Instant.now();
rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition());
stats.pollComplete(new Duration(pollStart, Instant.now()), !rawRecords.isEmpty(), rawRecords.count());
// When there are no records available for the current TopicPartition, self-checkpoint
// and move to process the next element.
if (rawRecords.isEmpty()) {
stats.log("beam-io-sdf: resume, rawRecords.isEmpty()");
if (timestampPolicy != null) {
updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
}
return ProcessContinuation.resume();
}
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
Instant claimStart = Instant.now();
if (!tracker.tryClaim(rawRecord.offset())) {
stats.claimComplete(new Duration(claimStart, Instant.now()), false);
stats.log("beam-io-sdf: stop, !tryClaim()");
return ProcessContinuation.stop();
}
stats.claimComplete(new Duration(claimStart, Instant.now()), true);
KafkaRecord<K, V> kafkaRecord =
new KafkaRecord<>(
rawRecord.topic(),
Expand Down Expand Up @@ -421,6 +482,10 @@ public ProcessContinuation processElement(
}
receiver.outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), outputTimestamp);
}
if (stats.totalDuration().getStandardSeconds() >= 30) {
stats.log("beam-io-sdf: periodic");
stats = new PoolLoopStats();
}
}
}
}
Expand Down