Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1944] Support Hudi to read from committed offset #3175

Merged
merged 3 commits into from
Jun 30, 2021

Conversation

veenaypatil
Copy link
Contributor

@veenaypatil veenaypatil commented Jun 28, 2021

What is the purpose of the pull request

This is a followup to - #3092, this change adds support to read from committed offset

Brief change log

  • Update KafkaOffsetGen to read from consumer group committed offset
  • Added NONE to KafkaResetOffsetStrategies

Verify this pull request

This change added tests and can be verified as follows:

  • Added unit test testGetNextOffsetRangesFromGroup in TestKafkaOffsetGen

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@hudi-bot
Copy link

hudi-bot commented Jun 28, 2021

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run travis re-run the last Travis build
  • @hudi-bot run azure re-run the last Azure build

@codecov-commenter
Copy link

codecov-commenter commented Jun 28, 2021

Codecov Report

Merging #3175 (c5471c3) into master (039aeb6) will decrease coverage by 1.88%.
The diff coverage is 93.75%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #3175      +/-   ##
============================================
- Coverage     46.16%   44.27%   -1.89%     
+ Complexity     5370     4603     -767     
============================================
  Files           921      825      -96     
  Lines         39953    36593    -3360     
  Branches       4288     3945     -343     
============================================
- Hits          18444    16202    -2242     
+ Misses        19630    18645     -985     
+ Partials       1879     1746     -133     
Flag Coverage Δ
hudicli 39.95% <ø> (ø)
hudiclient 16.44% <ø> (-14.02%) ⬇️
hudicommon 47.58% <ø> (ø)
hudiflink 60.04% <ø> (+0.18%) ⬆️
hudihadoopmr 51.29% <ø> (ø)
hudisparkdatasource 67.06% <ø> (ø)
hudisync 54.05% <ø> (ø)
huditimelineservice 64.36% <ø> (ø)
hudiutilities 58.62% <93.75%> (+0.18%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...hudi/utilities/sources/helpers/KafkaOffsetGen.java 88.32% <93.75%> (+1.22%) ⬆️
...java/org/apache/hudi/sink/StreamWriteFunction.java 81.48% <0.00%> (-2.87%) ⬇️
...e/hudi/client/heartbeat/HoodieHeartbeatClient.java 68.22% <0.00%> (-0.94%) ⬇️
...ain/java/org/apache/hudi/util/FlinkClientUtil.java 10.71% <0.00%> (-0.40%) ⬇️
...ache/hudi/sink/StreamWriteOperatorCoordinator.java 71.60% <0.00%> (-0.34%) ⬇️
...va/org/apache/hudi/configuration/FlinkOptions.java 96.37% <0.00%> (ø)
.../org/apache/hudi/streamer/FlinkStreamerConfig.java 0.00% <0.00%> (ø)
...org/apache/hudi/client/HoodieFlinkWriteClient.java 0.00% <0.00%> (ø)
...udi/table/action/commit/SparkBulkInsertHelper.java
.../compact/HoodieSparkMergeOnReadTableCompactor.java
... and 98 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 039aeb6...c5471c3. Read the comment docs.

} else {
LOG.warn("There are no commits associated with this consumer group, starting to consume form latest offset");
fromOffsets = consumer.endOffsets(topicPartitions);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems the same as LATEST ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangxianghu Yes, incase the offsets are not committed for consumer group we are setting it to latest, do you suggest to throw an exception instead ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@veenaypatil I mean in this feature, is there any difference between latest and none

copied from org.apache.kafka.clients.consumer.ConsumerConfig#AUTO_OFFSET_RESET_CONFIG

    /**
     * <code>auto.offset.reset</code>
     */
    public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
    public static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li></ul>";

Copy link
Contributor Author

@veenaypatil veenaypatil Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangxianghu

Yes, if you update the value to earliest/latest in this test case the consumer will start reading either from 0th offset (earliest) or 500th offset (latest), instead it should start from 250th offset (as this is the last committed offset)

KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("none"));


I actually don't like the NONE option here and wanted to use GROUP but the consumer will throw an exception in that case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangxianghu

Yes, if you update the value to earliest/latest in this test case the consumer will start reading either from 0th offset (earliest) or 500th offset (latest), instead it should start from 250th offset (as this is the last committed offset)

KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("none"));

I actually don't like the NONE option here and wanted to use GROUP but the consumer will throw an exception in that case

IIUC, if you have committed the offset to kafka, when you start the consumer with the same group.id as before and set auto.offset.set to latest, it will continue to consume offset committed last time(which means 250th as you mentioned), right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangxianghu yes, but that's not happening, I think it is because of the way we are explicitly setting the fromOffsets here - https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L221 which moves the consumer to end, that is it starts reading from 500th offset for partition0 and partition1 in test case

Copying doc for KafkaConsumer#endOffsets

Get the end offsets for the given partitions. In the default read_uncommitted isolation level, the end offset is the high watermark (that is, the offset of the last successfully replicated message plus one)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangxianghu yes, but that's not happening, I think it is because of the way we are explicitly setting the fromOffsets here - https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L221 which moves the consumer to end, that is it starts reading from 500th offset for partition0 and partition1 in test case

Copying doc for KafkaConsumer#endOffsets

Get the end offsets for the given partitions. In the default read_uncommitted isolation level, the end offset is the high watermark (that is, the offset of the last successfully replicated message plus one)

you are right.

@wangxianghu wangxianghu self-assigned this Jun 29, 2021
@wangxianghu wangxianghu added the hudistreamer issues related to Hudi streamer (Formely deltastreamer) label Jun 29, 2021
@wangxianghu
Copy link
Contributor

currently, our org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.KafkaResetOffsetStrategies is not the same as kafka, is it possible make them the same ?

@veenaypatil
Copy link
Contributor Author

currently, our org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.KafkaResetOffsetStrategies is not the same as kafka, is it possible make them the same ?

did not get you ? we have LATEST, EARLIEST and NONE which is same as Kafka auto reset config

@wangxianghu
Copy link
Contributor

wangxianghu commented Jun 29, 2021

currently, our org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.KafkaResetOffsetStrategies is not the same as kafka, is it possible make them the same ?

did not get you ? we have LATEST, EARLIEST and NONE which is same as Kafka auto reset config

hudi‘s LATEST means the end offsets for given partitions, no matter what the current offset is, while kafka's LATEST` will consumer from the last commit first. if there is no init offset it is the same as hudi's, right?

@veenaypatil
Copy link
Contributor Author

@wangxianghu IMO, we should not update the existing functionality, as this can impact the existing Hudi users, till now Hudi did not have a provision to read from committed offset, this just adds an option for the users to set auto.offset.reset to none.

We can think if we should take the option as auto.offset.reset=group from user and then set to none in code so that the consumer does not throw exception.
This change aligns with the way Flink also provides various options to the user - https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-start-position-configuration

@wangxianghu
Copy link
Contributor

@wangxianghu IMO, we should not update the existing functionality, as this can impact the existing Hudi users, till now Hudi did not have a provision to read from committed offset, this just adds an option for the users to set auto.offset.reset to none.

We can think if we should take the option as auto.offset.reset=group from user and then set to none in code so that the consumer does not throw exception.
This change aligns with the way Flink also provides various options to the user - https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-start-position-configuration

I agree with you.
I think you can give a try

@veenaypatil
Copy link
Contributor Author

@wangxianghu as per our discussion, changes are done, pls review

@veenaypatil veenaypatil reopened this Jun 30, 2021
Copy link
Contributor

@wangxianghu wangxianghu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@veenaypatil thanks for your patience, LGTM now
will merge when the ci is green

@wangxianghu wangxianghu merged commit 94f0f40 into apache:master Jun 30, 2021
Samrat002 pushed a commit to Samrat002/hudi that referenced this pull request Jul 15, 2021
change the insret overwrte return type

[HUDI-1860] Test wrapper for insert_overwrite and insert_overwrite_table

[HUDI-2084] Resend the uncommitted write metadata when start up (apache#3168)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>

[HUDI-2081] Move schema util tests out from TestHiveSyncTool (apache#3166)

[HUDI-2094] Supports hive style partitioning for flink writer (apache#3178)

[HUDI-2097] Fix Flink unable to read commit metadata error (apache#3180)

[HUDI-2085] Support specify compaction paralleism and compaction target io for flink batch compaction (apache#3169)

[HUDI-2092] Fix NPE caused by FlinkStreamerConfig#writePartitionUrlEncode null value (apache#3176)

[HUDI-2006] Adding more yaml templates to test suite (apache#3073)

[HUDI-2103] Add rebalance before index bootstrap (apache#3185)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>

[HUDI-1944] Support Hudi to read from committed offset (apache#3175)

* [HUDI-1944] Support Hudi to read from committed offset

* [HUDI-1944] Adding group option to KafkaResetOffsetStrategies

* [HUDI-1944] Update Exception msg

[HUDI-2052] Support load logFile in BootstrapFunction (apache#3134)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>

[HUDI-89] Add configOption & refactor all configs based on that (apache#2833)

Co-authored-by: Wenning Ding <wenningd@amazon.com>

[MINOR] Update .asf.yaml to codify notification settings, turn on jira comments, gh discussions (apache#3164)

- Turn on comment for jira, so we can track PR activity better
- Create a notification settings that match https://gitbox.apache.org/schemes.cgi?hudi
- Try and turn on "discussions" on Github, to experiment

[MINOR] Fix broken build due to FlinkOptions (apache#3198)

[HUDI-2088] Missing Partition Fields And PreCombineField In Hoodie Properties For Table Written By Flink (apache#3171)

[MINOR] Add Documentation to KEYGENERATOR_TYPE_PROP (apache#3196)

[HUDI-2105] Compaction Failed For MergeInto MOR Table (apache#3190)

[HUDI-2051] Enable Hive Sync When Spark Enable Hive Meta  For Spark Sql (apache#3126)

[HUDI-2112] Support reading pure logs file group for flink batch reader after compaction (apache#3202)

[HUDI-2114] Spark Query MOR Table Written By Flink Return Incorrect Timestamp Value (apache#3208)

[HUDI-2121] Add operator uid for flink stateful operators (apache#3212)

[HUDI-2123]  Exception When Merge With Null-Value Field (apache#3214)

[HUDI-2124] A Grafana dashboard for HUDI. (apache#3216)

[HUDI-2057]  CTAS Generate An External Table When Create Managed Table (apache#3146)

[HUDI-1930] Bootstrap support configure KeyGenerator by type (apache#3170)

* [HUDI-1930] Bootstrap support configure KeyGenerator by type

[HUDI-2116] Support batch synchronization of partition datas to  hive metastore to avoid oom problem (apache#3209)

[HUDI-2126] The coordinator send events to write function when there are no data for the checkpoint (apache#3219)

[HUDI-2127] Initialize the maxMemorySizeInBytes in log scanner (apache#3220)

[HUDI-2058]support incremental query for insert_overwrite_table/insert_overwrite operation on cow table (apache#3139)

[HUDI-2129] StreamerUtil.medianInstantTime should return a valid date time string (apache#3221)

[HUDI-2131] Exception Throw Out When MergeInto With Decimal Type Field (apache#3224)

[HUDI-2122] Improvement in packaging insert into smallfiles (apache#3213)

[HUDI-2132] Make coordinator events as POJO for efficient serialization (apache#3223)

[HUDI-2106] Fix flink batch compaction bug while user don't set compaction tasks (apache#3192)

[HUDI-2133] Support hive1 metadata sync for flink writer (apache#3225)

[HUDI-2089]fix the bug that metatable cannot support non_partition table (apache#3182)

[HUDI-2028] Implement RockDbBasedMap as an alternate to DiskBasedMap in ExternalSpillableMap (apache#3194)

Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>

[HUDI-2135] Add compaction schedule option for flink (apache#3226)

[HUDI-2055] Added deltastreamer metric for time of lastSync (apache#3129)

[HUDI-2046] Loaded too many classes like sun/reflect/GeneratedSerializationConstructorAccessor in JVM metaspace (apache#3121)

Loaded too many classes when use kryo of spark to hudi

Co-authored-by: weiwei.duan <weiwei.duan@linkflowtech.com>

[HUDI-1996] Adding functionality to allow the providing of basic auth creds for confluent cloud schema registry (apache#3097)

* adding support for basic auth with confluent cloud schema registry

[HUDI-2093] Fix empty avro schema path caused by duplicate parameters (apache#3177)

* [HUDI-2093] Fix empty avro schema path caused by duplicate parameters

* rename shcmea option key

* fix doc

* rename var name

[HUDI-2113] Fix integration testing failure caused by sql results out of order (apache#3204)

[HUDI-2016] Fixed bootstrap of Metadata Table when some actions are in progress. (apache#3083)

Metadata Table cannot be bootstrapped when any action is in progress. This is detected by the presence of inflight or requested instants. The bootstrapping is initiated in preWrite and postWrite of each commit. So bootstrapping will be retried again until it succeeds.
Also added metrics for when the bootstrapping fails or a table is re-bootstrapped. This will help detect tables which are not getting bootstrapped.

[HUDI-2140] Fixed the unit test TestHoodieBackedMetadata.testOnlyValidPartitionsAdded. (apache#3234)

[HUDI-2115] FileSlices in the filegroup is not descending by timestamp (apache#3206)

[HUDI-1104] Adding support for UserDefinedPartitioners and SortModes to BulkInsert with Rows (apache#3149)

[HUDI-2069] Refactored String constants (apache#3172)

[HUDI-1105] Adding dedup support for Bulk Insert w/ Rows (apache#2206)

[HUDI-2134]Add generics to avoif forced conversion in BaseSparkCommitActionExecutor#partition (apache#3232)

[HUDI-2009] Fixing extra commit metadata in row writer path (apache#3075)

[HUDI-2099]hive lock which state is WATING should be released, otherwise this hive lock will be locked forever (apache#3186)

[MINOR] Fix build broken from apache#3186 (apache#3245)

[HUDI-2136] Fix conflict when flink-sql-connector-hive and hudi-flink-bundle are both in flink lib (apache#3227)

[HUDI-2087] Support Append only in Flink stream (apache#3174)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>

UnitTest for deltaSync

Removing cosmetic changes and reuse function for insert_overwrite_table

unit test

intial unit test for the insert_overwrite and insert_over_write_table

Adding failed test code for insert_overwrite

Revert "[HUDI-2087] Support Append only in Flink stream (apache#3174)" (apache#3251)

This reverts commit 3715267.

[HUDI-2147] Remove unused class AvroConvertor in hudi-flink (apache#3243)

[MINOR] Fix some wrong assert reasons (apache#3248)

[HUDI-2087] Support Append only in Flink stream (apache#3252)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>

[HUDI-2143] Tweak the default compaction target IO to 500GB when flink async compaction is off (apache#3238)

[HUDI-2142] Support setting bucket assign parallelism for flink write task (apache#3239)

[HUDI-1483] Support async clustering for deltastreamer and Spark streaming (apache#3142)

- Integrate async clustering service with HoodieDeltaStreamer and HoodieStreamingSink
- Added methods in HoodieAsyncService to reuse code

[HUDI-2107] Support Read Log Only MOR Table For Spark (apache#3193)

[HUDI-2144]Bug-Fix:Offline clustering(HoodieClusteringJob) will cause insert action losing data (apache#3240)

* fixed

* add testUpsertPartitionerWithSmallFileHandlingAndClusteringPlan ut

* fix CheckStyle

Co-authored-by: yuezhang <yuezhang@freewheel.tv>

[MINOR] Fix EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION config (apache#3250)

[HUDI-2168] Fix for AccessControlException for anonymous user (apache#3264)

[HUDI-2045] Support Read Hoodie As DataSource Table For Flink And DeltaStreamer

test with insert-overwrite and insert-overwrite-table

removing hardcoded action to pass the unit test

[HUDI-1969] Support reading logs for MOR Hive rt table (apache#3033)

[HUDI-2171] Add parallelism conf for bootstrap operator

using delta-commit for insert_overwrite
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
hudistreamer issues related to Hudi streamer (Formely deltastreamer)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants