Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-873] Add offset look-back option in Kafka consumer #2721

Closed
wants to merge 3 commits into from

Conversation

MeghaUpadhyay
Copy link

@MeghaUpadhyay MeghaUpadhyay commented Aug 24, 2019

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    Current kafka consumer supports only latest and easiest offset reset options if previous state is not available. This PR is to add a new option , offset look-back , through which offset can be reset to specific amount from the latest offset for each topic partition.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@codecov-io
Copy link

codecov-io commented Aug 24, 2019

Codecov Report

Merging #2721 into master will increase coverage by 0.94%.
The diff coverage is 0%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #2721      +/-   ##
============================================
+ Coverage     44.13%   45.08%   +0.94%     
- Complexity     8576     8745     +169     
============================================
  Files          1880     1880              
  Lines         70170    70197      +27     
  Branches       7700     7705       +5     
============================================
+ Hits          30973    31651     +678     
+ Misses        36312    35625     -687     
- Partials       2885     2921      +36
Impacted Files Coverage Δ Complexity Δ
...in/source/extractor/extract/kafka/KafkaSource.java 0% <0%> (ø) 0 <0> (ø) ⬇️
...in/java/org/apache/gobblin/cluster/HelixUtils.java 35.51% <0%> (-3.74%) 12% <0%> (-1%)
.../org/apache/gobblin/cluster/GobblinTaskRunner.java 65.72% <0%> (-0.47%) 29% <0%> (ø)
...src/main/java/org/apache/gobblin/runtime/Task.java 67.05% <0%> (+0.23%) 82% <0%> (+1%) ⬆️
.../org/apache/gobblin/runtime/SafeDatasetCommit.java 46.53% <0%> (+0.49%) 28% <0%> (ø) ⬇️
...rg/apache/gobblin/runtime/AbstractJobLauncher.java 58.33% <0%> (+0.83%) 31% <0%> (+2%) ⬆️
...ain/java/org/apache/gobblin/runtime/fork/Fork.java 75% <0%> (+0.83%) 68% <0%> (ø) ⬇️
...in/java/org/apache/gobblin/runtime/JobContext.java 76.11% <0%> (+2.22%) 50% <0%> (+1%) ⬆️
...he/gobblin/writer/FineGrainedWatermarkTracker.java 84.67% <0%> (+2.41%) 29% <0%> (+1%) ⬆️
...va/org/apache/gobblin/runtime/util/JobMetrics.java 73.68% <0%> (+2.63%) 13% <0%> (+1%) ⬆️
... and 38 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update bd4a09a...53f0841. Read the comment docs.

Copy link
Contributor

@shirshanka shirshanka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I think this is a useful feature.
I added some comments that need addressing.

@@ -451,7 +459,21 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceSt
LOG.warn(
offsetNotFoundMsg + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
offsets.startAtEarliestOffset();
} else {
} else if (offsetOption.equals(OFFSET_LOOKBACK)) {
long lookbackOffsetRange = state.getPropAsLong("kafka.offset.lookback", 0L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor to static constant "kafka.offset.lookback"

String offsetOutOfRangeMsg = String.format(
"Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
LOG.warn(offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can have race condition (latest offset can change) between this log message and the next startAtLatestOffsets call.

So get latest offset first.
Then log it and start consuming from it as a provided offset?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made changes as per the below comment from autumnust. I am reusing the functionality to check for "RESET_ON_OFFSET_OUT_OF_RANGE" and make decision accordingly

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using this functionality (with lookback) in our production pipeline since many days. It is going to be very useful where data retention is couple of days in kafka and wants to avoid using earliest. Lookback help us in going back to certain offset and start consuming from that point (when we lost previous state information).

bootstrap.with.offset=earliest
#bootstrap.with.offset=earliest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this line

@@ -90,6 +90,8 @@
public static final String LATEST_OFFSET = "latest";
public static final String EARLIEST_OFFSET = "earliest";
public static final String NEAREST_OFFSET = "nearest";
public static final String OFFSET_LOOKBACK = "offset_lookback";
public static final String TIMESTAMP_LOOKBACK = "timestamp_lookback";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like timestamp lookback is not being used. Do you plan to modify this PR with support for timestamp lookback as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka 0.10.1.0 consumer supports offsetsForTimes. I am exploring on how can I have same functionality for lower kafka versions. I will remove this from this PR and create a new PR with time based lookup. Could you please suggest on how should we incorporate support for new kafka client versions (kafka09+) ?

String offsetOutOfRangeMsg = String.format(
"Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
LOG.warn(offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In KafkaSource there's a configuration RESET_ON_OFFSET_OUT_OF_RANGE which configures a specific behavior when offset is out of bound. Let's use that configuration here to determine the behavior for consistency.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure , I have made the changes to use same functionality.

} else if (offsetOption.equals(OFFSET_LOOKBACK)) {
long lookbackOffsetRange = state.getPropAsLong(KAFKA_OFFSET_LOOKBACK , 0L);
long offset = offsets.getLatestOffset() - lookbackOffsetRange;
LOG.warn(offsetNotFoundMsg + "This partition will start from latest-lookback [ " + offsets.getLatestOffset() + " - " + lookbackOffsetRange + " ] start offset: " + offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could have an incorrect log line due to a race condition: offsets.getLatestOffset() here could be different from the result on line 464. I'd suggest caching the value in a variable (on line 464) and using it in subsequent code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure Thanks .. I have updated this line to store offsets.getLatestOffset() in a variable and use that in subsequent code. But I am finding same issue in multiple places like here https://github.com/apache/incubator-gobblin/pull/2721/files#diff-1f786615c11702cd74b076ab157da963R479 ... Which might lead to incorrect logging and using different offset in "offsets.startAtLatestOffset();". Please suggest something here.

@MeghaUpadhyay
Copy link
Author

@shirshanka @autumnust Please review latest changes.

Copy link
Contributor

@shirshanka shirshanka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@MeghaUpadhyay
Copy link
Author

@autumnust Can you please review latest changes. Thanks !

@shirshanka
Copy link
Contributor

@MeghaUpadhyay : could you update this PR in line with the Gobblin PR requirements?

Please go through the PR checklist and make sure you check them all off (jira, description, testing, subject line)
I have created a JIRA for you to link to: https://issues.apache.org/jira/browse/GOBBLIN-873

Check other PR-s for examples: Here is a good example of what we want in the Pull request body:
#2701

I'll merge this in after you've taken care of these!

@MeghaUpadhyay
Copy link
Author

MeghaUpadhyay commented Sep 8, 2019

@MeghaUpadhyay : could you update this PR in line with the Gobblin PR requirements?

Please go through the PR checklist and make sure you check them all off (jira, description, testing, subject line)
I have created a JIRA for you to link to: https://issues.apache.org/jira/browse/GOBBLIN-873

Check other PR-s for examples: Here is a good example of what we want in the Pull request body:
#2701

I'll merge this in after you've taken care of these!

Thanks @shirshanka .. I have done the changes , please let me know anything else is needed

@MeghaUpadhyay MeghaUpadhyay changed the title Add offset look-back option in Kafka consumer [GOBBLIN-873] Add offset look-back option in Kafka consumer Sep 8, 2019
@asfgit asfgit closed this in 2fc2f62 Sep 9, 2019
@shirshanka
Copy link
Contributor

Congrats on your first contribution! Looking forward to more 👍

Will-Lo pushed a commit to Will-Lo/incubator-gobblin that referenced this pull request Sep 18, 2019
jhsenjaliya pushed a commit to jhsenjaliya/incubator-gobblin that referenced this pull request Apr 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants