Skip to content

Commit

Permalink
Fixes based on review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
meupadhyay committed Aug 26, 2019
1 parent 508c129 commit 53f0841
Showing 1 changed file with 3 additions and 2 deletions.
Expand Up @@ -461,8 +461,9 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceSt
offsets.startAtEarliestOffset();
} else if (offsetOption.equals(OFFSET_LOOKBACK)) {
long lookbackOffsetRange = state.getPropAsLong(KAFKA_OFFSET_LOOKBACK , 0L);
long offset = offsets.getLatestOffset() - lookbackOffsetRange;
LOG.warn(offsetNotFoundMsg + "This partition will start from latest-lookback [ " + offsets.getLatestOffset() + " - " + lookbackOffsetRange + " ] start offset: " + offset);
long latestOffset = offsets.getLatestOffset();
long offset = latestOffset - lookbackOffsetRange;
LOG.warn(offsetNotFoundMsg + "This partition will start from latest-lookback [ " + latestOffset + " - " + lookbackOffsetRange + " ] start offset: " + offset);
try {
offsets.startAt(offset);
} catch (StartOffsetOutOfRangeException e) {
Expand Down

0 comments on commit 53f0841

Please sign in to comment.