Skip to content

[MINOR] support log index#10143

Closed
watermelon12138 wants to merge 1 commit intoapache:masterfrom
watermelon12138:SupportLogIndex
Closed

[MINOR] support log index#10143
watermelon12138 wants to merge 1 commit intoapache:masterfrom
watermelon12138:SupportLogIndex

Conversation

@watermelon12138
Copy link
Contributor

@watermelon12138 watermelon12138 commented Nov 20, 2023

Change Logs

The log index is supported to up the speed of reading log in Flink.

Impact

None. This feature is disabled by default.

Risk level (write none, low medium or high below)

None

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Contributor

Can you wrap up a general design of the changes, so that we are more eaiser to reach concensus for the general direction.

@watermelon12138
Copy link
Contributor Author

Can you wrap up a general design of the changes, so that we are more eaiser to reach concensus for the general direction.
@danny0405 ok, I will summarize the overall modification content and design ideas after I finish adding UT.

@watermelon12138
Copy link
Contributor Author

@danny0405
Hello, Danny
I would like to ask that why data with the same primary key is written to different log files (with the same FileId and different timestamps) in upsert mode? As a result, I cannot write ut to test the LogIndex capability. My test code is as follows:

` public void testHoodiePipelineBuilderSource() throws Exception {
//create a StreamExecutionEnvironment instance.
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfig().disableObjectReuse();
execEnv.setParallelism(1);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setString(FlinkOptions.TABLE_NAME, "t1");
conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
conf.setBoolean(FlinkOptions.LOG_INDEX_ENABLE, true);
conf.setString(FlinkOptions.PRECOMBINE_FIELD, "ts");
conf.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid");
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
conf.setString(FlinkOptions.OPERATION, "upsert");

// write 3 batches of data set
TestData.writeData(TestData.dataSetInsert(1), conf);
TestData.writeData(TestData.dataSetInsert(1), conf);`

@watermelon12138
Copy link
Contributor Author

@danny0405 Hello, Danny I would like to ask that why data with the same primary key is written to different log files (with the same FileId and different timestamps) in upsert mode? As a result, I cannot write ut to test the LogIndex capability. My test code is as follows:

` public void testHoodiePipelineBuilderSource() throws Exception { //create a StreamExecutionEnvironment instance. StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.getConfig().disableObjectReuse(); execEnv.setParallelism(1); // set up checkpoint interval execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); conf.setString(FlinkOptions.TABLE_NAME, "t1"); conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ"); conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET"); conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1); conf.setBoolean(FlinkOptions.LOG_INDEX_ENABLE, true); conf.setString(FlinkOptions.PRECOMBINE_FIELD, "ts"); conf.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid"); conf.setBoolean(FlinkOptions.PRE_COMBINE, true); conf.setString(FlinkOptions.OPERATION, "upsert");

// write 3 batches of data set
TestData.writeData(TestData.dataSetInsert(1), conf);
TestData.writeData(TestData.dataSetInsert(1), conf);`

@ad1happy2go
Hi great man !
Can you help me to resolve this? Thank you very mach.

@danny0405
Copy link
Contributor

I would like to ask that why data with the same primary key is written to different log files (with the same FileId and different timestamps) in upsert mode?

The primary lifecycle is maintained within one FileGroup, different log files may indicate multiple changes to one key which scattered among multiple commits.

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:M PR with lines of changes in (100, 300]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants