Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-873] Add offset look-back option in Kafka consumer #2721

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -90,7 +90,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
public static final String LATEST_OFFSET = "latest";
public static final String EARLIEST_OFFSET = "earliest";
public static final String NEAREST_OFFSET = "nearest";
public static final String OFFSET_LOOKBACK = "offset_lookback";
public static final String BOOTSTRAP_WITH_OFFSET = "bootstrap.with.offset";
public static final String KAFKA_OFFSET_LOOKBACK = "kafka.offset.lookback";
public static final String DEFAULT_BOOTSTRAP_WITH_OFFSET = LATEST_OFFSET;
public static final String TOPICS_MOVE_TO_LATEST_OFFSET = "topics.move.to.latest.offset";
public static final String RESET_ON_OFFSET_OUT_OF_RANGE = "reset.on.offset.out.of.range";
Expand Down Expand Up @@ -440,8 +442,14 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceSt
offsets.startAtLatestOffset();
} else if (previousOffsetNotFound) {

// When previous offset cannot be found, either start at earliest offset or latest offset, or skip the partition
// (no need to create an empty workunit in this case since there's no offset to persist).
/**
* When previous offset cannot be found, either start at earliest offset, latest offset, go back with (latest - lookback)
* (long value to be deducted from latest offset in order to avoid data loss) or skip the partition
* (no need to create an empty workunit in this case since there's no offset to persist).
* In case of no previous state OFFSET_LOOKBACK will make sure to avoid consuming huge amount of data (earlist) and data loss (latest offset)
* lookback can be set to any long value where (latest-lookback) is nearest offset for each partition. If computed offset is out of range then
* partition will be consumed from latest offset
**/
String offsetNotFoundMsg = String.format("Previous offset for partition %s does not exist. ", partition);
String offsetOption = state.getProp(BOOTSTRAP_WITH_OFFSET, DEFAULT_BOOTSTRAP_WITH_OFFSET).toLowerCase();
if (offsetOption.equals(LATEST_OFFSET)) {
Expand All @@ -451,7 +459,44 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceSt
LOG.warn(
offsetNotFoundMsg + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
offsets.startAtEarliestOffset();
} else {
} else if (offsetOption.equals(OFFSET_LOOKBACK)) {
long lookbackOffsetRange = state.getPropAsLong(KAFKA_OFFSET_LOOKBACK , 0L);
long latestOffset = offsets.getLatestOffset();
long offset = latestOffset - lookbackOffsetRange;
LOG.warn(offsetNotFoundMsg + "This partition will start from latest-lookback [ " + latestOffset + " - " + lookbackOffsetRange + " ] start offset: " + offset);
try {
offsets.startAt(offset);
} catch (StartOffsetOutOfRangeException e) {
// Increment counts, which will be reported as job metrics
if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
this.offsetTooEarlyCount.incrementAndGet();
} else {
this.offsetTooLateCount.incrementAndGet();
}

// When above computed offset (latest-lookback) is out of range, either start at earliest, latest or nearest offset, or skip the
// partition. If skipping, need to create an empty workunit so that previousOffset is persisted.
String offsetOutOfRangeMsg = String.format(
"Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
offsetOption =
state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
&& offsets.getStartOffset() >= offsets.getLatestOffset())) {
LOG.warn(
offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
offsets.startAtLatestOffset();
} else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
.getEarliestOffset());
offsets.startAtEarliestOffset();
} else {
LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
}
}
}
else {
LOG.warn(offsetNotFoundMsg + "This partition will be skipped.");
return null;
}
Expand Down
Expand Up @@ -43,6 +43,7 @@ source.kafka.json.schema=${kafka.schema.json}

topic.whitelist=${kafka.topics}
org.apache.gobblin.kafka.consumerClient.class="org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory"
# Accepted values - latest, earliest and with offset lookback (latest offset - lookback)
bootstrap.with.offset=earliest
kafka.workunit.packer.type=SINGLE_LEVEL
mr.job.max.mappers=10
Expand Down