Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1522] Add a new pipeline for Flink writer #2430

Merged
merged 2 commits into from
Jan 28, 2021
Merged

Conversation

danny0405
Copy link
Contributor

@danny0405 danny0405 commented Jan 11, 2021

What is the purpose of the pull request

This is the #step 1 of RFC-24:
https://cwiki.apache.org/confluence/display/HUDI/RFC+-+24%3A+Hoodie+Flink+Writer+Proposal

in order to add a new pipeline to avoid the single parallelism operators.

Brief change log

  • The new pipeline removes the InstantGenerateOperator and CommitSink, add
    StreamWriteOperatorCoordinator to achieve the same function
  • Add HoodieOptions for configuration of all the Flink operators, this
    is also useful for the SQL connectors which would be introduced in
    following contributions
  • Add UT and IT cases for the new operators

Verify this pull request

  • StreamWriteFunctionTest to test the StreamWriteFunction and the related components
  • StreamWriteITCase is a IT to test all the operators

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

+ "the `partitionPath` component of `HoodieKey`. Actual value obtained by invoking .toString(). By default `partitionpath`.")
public String partitionPathField = "partitionpath";

@Parameter(names = {"--partition-path-field"}, description = "Key generator class, that implements will extract the key out of incoming record.\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

little mistake --partition-path-field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks for the reminder.

@codecov-io
Copy link

codecov-io commented Jan 11, 2021

Codecov Report

Merging #2430 (2c4fa32) into master (c4afd17) will increase coverage by 0.09%.
The diff coverage is 46.03%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #2430      +/-   ##
============================================
+ Coverage     50.18%   50.27%   +0.09%     
- Complexity     3050     3119      +69     
============================================
  Files           419      430      +11     
  Lines         18931    19561     +630     
  Branches       1948     2003      +55     
============================================
+ Hits           9500     9835     +335     
- Misses         8656     8923     +267     
- Partials        775      803      +28     
Flag Coverage Δ Complexity Δ
hudicli 37.21% <ø> (ø) 0.00 <ø> (ø)
hudiclient 100.00% <ø> (ø) 0.00 <ø> (ø)
hudicommon 51.51% <44.44%> (+0.02%) 0.00 <6.00> (ø)
hudiflink 32.96% <46.05%> (+32.96%) 0.00 <64.00> (ø)
hudihadoopmr 33.16% <ø> (ø) 0.00 <ø> (ø)
hudisparkdatasource 65.85% <ø> (ø) 0.00 <ø> (ø)
hudisync 48.61% <ø> (ø) 0.00 <ø> (ø)
huditimelineservice 66.49% <ø> (ø) 0.00 <ø> (ø)
hudiutilities 69.43% <ø> (ø) 0.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
.../apache/hudi/operator/InstantGenerateOperator.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
...pache/hudi/operator/KeyedWriteProcessFunction.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
.../org/apache/hudi/operator/StreamWriteOperator.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...ache/hudi/operator/StreamWriteOperatorFactory.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...in/java/org/apache/hudi/schema/SchemaProvider.java 50.00% <ø> (+50.00%) 1.00 <0.00> (+1.00)
...src/main/java/org/apache/hudi/sink/CommitSink.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
...di/source/JsonStringToHoodieRecordMapFunction.java 0.00% <ø> (ø) 0.00 <0.00> (ø)
.../org/apache/hudi/streamer/FlinkStreamerConfig.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
.../org/apache/hudi/streamer/HoodieFlinkStreamer.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...rg/apache/hudi/streamer/HoodieFlinkStreamerV2.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
... and 25 more

@yanghua yanghua self-assigned this Jan 11, 2021
@yanghua yanghua self-requested a review January 11, 2021 13:27
@danny0405 danny0405 changed the title [HUDI-1522] Remove the single parallelism operator from the Flink writer [HUDI-1522] Add a new pipeline for Flink writer Jan 13, 2021
Copy link
Contributor

@yanghua yanghua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 thanks for your contribution. Just have a quick look, left some comments.

env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
}

Properties kafkaProps = StreamerUtil.getKafkaProps(cfg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And can we extract the same logic both in v1 and v2?

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.ParameterException;

/** Converter that converts a string into enum WriteOperationType. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the consistency to make the comment of class level align to the existing classes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a OperationConverter in org.apache.hudi.utilities.deltastreamer let us use single one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.hudi.utilities.deltastreamer OperationConverter is in a different module, i would suggest to not add dependency to that. The hoodie-flink module should not depend on the hudi-utilities.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that can we find a way to move this class into a common place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, if we fond more usages of this class, in current code base, only 2 places use it, and the class itself is pretty simple, it does not deserve to move the class to common utils.


/** Utilities for Flink stream read and write. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

public static org.apache.hadoop.conf.Configuration getHadoopConf() {
// create HiveConf from hadoop configuration with hadoop conf directory configured.
org.apache.hadoop.conf.Configuration hadoopConf = null;
for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new Configuration())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to find a possible config file that we may not know its config information? Or only a new Object just enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to find the correct hadoop configuration to create correct hadoop filesystem, this is also useful for HoodieFlinkEngineContext.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that can we specify a discovery-order, e.g. first find the specified path, and then find a batch of default paths.

In some scenarios where storage is separated from computing, or data synchronization scenarios, users may not necessarily want to load the local "default" configuration.

Copy link
Contributor Author

@danny0405 danny0405 Jan 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method firstly find the specified path fs.hdfs.hadoopconf, then directory HADOOP_CONF_DIR HADOOP_HOME/conf HADOOP_HOME/etc/hadoop from the system environment.

Even if storage is separated from computing, the FileSystem we created is still correct, if we split the hadoop conf files correctly.

In any case, we should not pass an empty hadoop configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method firstly find the specified path fs.hdfs.hadoopconf, then directory HADOOP_CONF_DIR HADOOP_HOME/conf HADOOP_HOME/etc/hadoop from the system environment.

I have watched the source code before raising this concern.

Even if storage is separated from computing, the FileSystem we created is still correct, if we split the hadoop conf files correctly.

In any case, we should not pass an empty hadoop configuration.

I mean, do we allow the user's explicit parameter assignment as the highest priority? Greater than the default convention that some users may not know?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is reasonable that the explicit specified path has the highest priority.

* @return A Hadoop configuration instance.
*/
private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) {
if (new File(hadoopConfDir).exists()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Files.exists() to test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean java.nio.file.Files ? I guess new File(hadoopConfDir).exists() is more simpler.

// Modify to be compatible with old version Avro.
if (genericRecord.getSchema().getField(isDeleteKey) == null) {
return false;
}
Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
Copy link
Contributor

@yanghua yanghua Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do not change the literal to the variable that you defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fix the logic for Avro version that higher than 1.10, because the higher version throws directly if the field name does not exist in the schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean you have defined the isDeleteKey, why do not change the literal in this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okey, got your idea, thanks ~

hudi-flink/pom.xml Show resolved Hide resolved
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<groupId>org.apache.flink</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make the changes in this file cleaner?

*
* <p>It has the options for Hoodie table read and write. It also defines some utilities.
*/
public class HoodieOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does HoodieFlinkConfigOptions sounds better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally i prefer cleaner and shorter names, because options are phrase of Flink connectors, and it is under the package org.apache.hudi.operator, we can make it more specific if we introduce other config options class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine, it's not a key issue, different thought is OK.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for adding Flink into the class name. We might add more engine support in the future. Use HoodieOptions make more sense to me if we add this connector under Flink's codebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine to add Flink to the class name.

}
});
checkPropNames.forEach(prop ->
Preconditions.checkState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can not merge the next two lines into this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally i prefer shorter line code. Refactoring it if Hoodie thinks long line code is ok.

Copy link
Member

@garyli1019 garyli1019 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 Thanks for your contribution, looks promising! Left some comments

*
* <p>It has the options for Hoodie table read and write. It also defines some utilities.
*/
public class HoodieOptions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for adding Flink into the class name. We might add more engine support in the future. Use HoodieOptions make more sense to me if we add this connector under Flink's codebase.

// Write Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> TABLE_NAME = ConfigOptions
.key("write.table.name")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the config under HoodieWriteConfiguration? That would be great if we can use the same set of config for both Spark and Flink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, thanks.

}

// Keep for mini-batch write.
private static class BufferSizeEstimator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This data also available in the WriteStatus. We have TotalWriteBytes and TotalRecords e.t.c. How are we gonna use this later on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To test the data buffer we received, the statistics in WriteStatus are all about written data which is different.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some docs here. Still a little bit confused about how this will be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we use this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

* @throws IOException if error occurs
*/
@SuppressWarnings("rawtypes")
private HoodieRecord toHoodie(I record) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: toHoodieRecord?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, thanks ~

} catch (Throwable throwable) {
// when a checkpoint fails, throws directly.
result.completeExceptionally(
new CompletionException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a hoodie internal exception here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is needed by the Flink coordinator.

implements CoordinatedOperatorFactory<Object>, OneInputStreamOperatorFactory<I, Object> {
private static final long serialVersionUID = 1L;

private static final long DEFAULT_MAX_BYTES_PER_BATCH = (1 << 21) * 128; // 256MB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the later use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, removed.

*/
public class OperationConverter implements IStringConverter<WriteOperationType> {
@Override
public WriteOperationType convert(String value) throws ParameterException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is available in the WriteOperationType. Can we reuse from there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class is used by the HoodieFlinkStreamer.

* Create a key generator class via reflection, passing in any configs needed.
* <p>
* If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class
* specified in {@code DataSourceWriteOptions}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this linked to Spark code

return this.function.getWriteClient();
}

public void checkpointFunction(long checkpointId) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this method being called? How do we generate the checkpointId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkpointId is specified by the test cases, the test case needs to invoke the checkpointFunction by itself.

/**
* Configurations for Hoodie Flink streamer.
*/
public class Config extends Configuration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this name is too generic. It would be better to make it more readable. What about renaming to FlinkStreamerConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with the suggest name.

@Parameter(names = {"--kafka-group-id"}, description = "kafka consumer group id", required = true)
public String kafkaGroupId;

@Parameter(names = {"--kafka-bootstrap-servers"}, description = "kafka bootstrap.servers", required = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many minor issues for the description items of these config options, e.g., describe it with a normal sentence(take care of the grammar?), Upper the first letter? In short, keep consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okey

@danny0405
Copy link
Contributor Author

Thanks @garyli1019 @yanghua for the review, i have updated based on your comments.

Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
final String isDeleteKey = "_hoodie_is_deleted";
// Modify to be compatible with old version Avro.
if (genericRecord.getSchema().getField(isDeleteKey) == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the old version Avro mean here? This seems like being handled by the check on return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, i mean the new version Avro, let me fix the comment.

.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual write, default is 4");

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add another config to specify index type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can if we support more index type, currently the new pipeline only supports one. (Although not in this PR, but it is my plan though ~ I don't want to add it then remove ~)

}

// Keep for mini-batch write.
private static class BufferSizeEstimator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some docs here. Still a little bit confused about how this will be used.

}

@VisibleForTesting
public String getInFlightInstant() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this only visible for testing? IIUC, the actual writer need to know the current instant and add _hoodie_commit_time field to the record. How did the writer know the current instant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did the writer know the current instant

Fetch through the HoodieFlinkWriteClient, actually the HoodieTableMetaClient.

LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
writeStatus = Collections.emptyList();
}
this.eventGateway.sendEventToCoordinator(new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a retry or ack mechanism here? What's the consequence if the coordinator does not receive the event?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failover when next checkpoint starts. There is a check in the StreamWriteOperatorCoordinator.checkpointCoordinator.

Copy link
Contributor

@yanghua yanghua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 Left some comments.

* @param instantTime The instant time under which to write the data
* @param writeStatuses The write statues list
*/
public BatchWriteSuccessEvent(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary comment for the constructor? it's easy to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@Parameter(names = {"--flink-checkpoint-path"}, description = "Flink checkpoint path.")
public String flinkCheckPointPath;

@Parameter(names = {"--flink-block-retry-times"}, description = "Times to retry when latest instant has not completed.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block seems hard to understand. Any better word?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from the old code, how about instant-retry-times ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

This is the #step 1 of RFC-24:
https://cwiki.apache.org/confluence/display/HUDI/RFC+-+24%3A+Hoodie+Flink+Writer+Proposal

* The new pipeline removes the InstantGenerateOperator and CommitSink,
  add StreamWriteOperatorCoordinator to achieve the same function
* Add FlinkOptions for configuration of all the Flink operators, this
  is also useful for the SQL connectors which would be introduced in
  following contributions
* Add UT and IT cases for the new operators
Copy link
Contributor

@yanghua yanghua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 Left some comments.

.key("write.table.type")
.stringType()
.defaultValue("COPY_ON_WRITE")
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove . after write

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with , instead.

public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
.key("write.table.type")
.stringType()
.defaultValue("COPY_ON_WRITE")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us reduce the string literal via HoodieTableType.COPY_ON_WRITE.name()?

+ "By default 2000 and it will be doubled by every retry");

public static final ConfigOption<Boolean> IGNORE_FAILED_BATCH = ConfigOptions
.key("write.ignore.failed.batch")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write.batch.ignore.failed sounds better? This config option describes a boolean flag, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove 'batch' because Flink may evolve to streaming write in the near future.

|| WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
HoodieRecordPayload payload = shouldCombine
? StreamerUtil.createPayload(this.config.getString(FlinkOptions.PAYLOAD_CLASS), gr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method createPayload is too long and hard to read. Can you make it easy to read?

}
}

private void waitFor(long intervalMillis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitFor what? Let us give it a more readable name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to sleepFor.

doCommit();
return;
} catch (Throwable throwable) {
String cause = throwable.getCause() == null ? "" : throwable.getCause().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check that how many exception categories? If there is not necessary to still retry, may we accept fast failed strategy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how many exception categories there ~, so we'd better keep the re-try because Flink is a streaming system, less failover is better, we can add the check for the exception categories in the future if it is needs, (for example, if there is a exception that does not deserve to retry, just fail fast.)

The user can forbidden the retry by setting the retry times to a negative though ~

@garyli1019
Copy link
Member

@danny0405 sorry for the delay on review, I was super busy this week. The bloom index was merged to master, can we add the bloom index option to this PR as well?

@danny0405
Copy link
Contributor Author

@danny0405 sorry for the delay on review, I was super busy this week. The bloom index was merged to master, can we add the bloom index option to this PR as well?

I'm not planning to using the BloomFilter index in the new pipeline, instead there is a BloomFilter index backed state index in the following PR, which is more suitable for streaming write.

Copy link
Contributor

@yanghua yanghua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 Kindred thanks for your contribution and patient. I approve this PR now. If there are some minor issues, let's fix them later. @wangxianghu @garyli1019 Do you still have any concern, I will timeout and merge it tomorrow morning!

@yanghua
Copy link
Contributor

yanghua commented Jan 27, 2021

@danny0405 oh, CI failed... Please fix it before merging, 3ks.

@garyli1019
Copy link
Member

@danny0405 sorry for the delay on review, I was super busy this week. The bloom index was merged to master, can we add the bloom index option to this PR as well?

I'm not planning to using the BloomFilter index in the new pipeline, instead there is a BloomFilter index backed state index in the following PR, which is more suitable for streaming write.

@danny0405 yes, using bloom index in a streaming fashion is what we are planning to do next as well. No worry for now, we can try this PR out and continue the work on top of this. Really appreciate your work! This PR LGTM.

@yanghua yanghua merged commit bc0325f into apache:master Jan 28, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants