New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-16497][FLINK-16496][FLINK-16495][connectors] Improve default flush strategy for JDBC/Elasticsearch/HBase sink for better out-of-box #12536
Conversation
cc @dawidwys for the Elasticsearch part. |
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 64dd9b7 (Tue Jun 09 03:03:05 UTC 2020) ✅no warnings Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
64dd9b7
to
52e932c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the ES part.
config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb); | ||
config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval); | ||
builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions()); | ||
builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to have any formatting/transformation in one place in the ElasticsearchOptions
. Can we return Mb from config.getBulkFlushMaxByteSize
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I though about this, however, I think we should validate the value is in MB granularity. If we remove the remainder, it will be harder to check it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then return the MemorySize
instead? Without copying the shifting logic which is implemented inside of the MemorySize
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we have to move the -1
logic out of it then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, let's leave it as it is then.
public long getBulkFlushInterval() { | ||
long interval = config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis(); | ||
// convert 0 to -1, because Elasticsearch client use -1 to disable this configuration. | ||
return interval == 0 ? -1 : interval; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not return empty in case it is set to 0? I prefer explicitness of Optional
instead of magic values. Plus we would not need to change the Elasticsearch connector code for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because Elasticsearch client has a default value for them (1000 rows, 5mb), we have to use -1
to disable them, otherwise the client will use the default value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bridge logic seems weird, why not just let user set -1
which is synced with ES way. 0
is not a good candidate for default value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, Duration and MemorySize ConfigOptions doesn't allow negative values, e.g. -1
. That's why I picked zero. I think zero is used as a disabled value in the many places, e.g. Kafka batch.size, and watermark interval.
c2a2337
to
d0602c5
Compare
checkState(config.getOptional(PASSWORD).isPresent(), | ||
"Database username must be provided when database password is provided"); | ||
} | ||
checkAllOrNone(config, new ConfigOption[]{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validation for URL
and TABLE_NAME
is not necessary because the find service already does that.
private long bufferFlushMaxSizeInBytes = ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; | ||
private long bufferFlushMaxRows = -1; | ||
private long bufferFlushIntervalMillis = -1; | ||
private long bufferFlushMaxRows = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer -1
as the default value, which means the value is not set. 0
seems to mean "never flush" .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried, but Duration and MemorySize ConfigOptions doesn't allow negative values.
Thanks for the review @danny0405 , I have addressed the comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, overall looks good, except for that limitation that ConfigOption
does not allow -1
as a fallback value, that would mean we can not setup forbidden flag for all the SQL connectors, which i think is not that user-friendly, maybe we should fire a JIRA issue there and have some discussion.
Agree with you @danny0405 , I created FLINK-18245 to track this. We can allow -1 in the future if it is supported. |
…keep the original constraint name
…C sink for better out-of-box The default flush strategy for old JDBC sink is no flush interval and 5000 buffered rows. The new default flush strategy for new JDBC sink is '1s' flush interval and '100' buffered rows.
…ase sink for better out-of-box The default flush strategy for old HBase sink is no flush interval and 2MB buffered size. The new default flush strategy for new HBase sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size.
…r new Elasticsearch sink for better out-of-box The default flush strategy for old Elasticsearch sink is no flush interval and 5MB buffered size and 1000 rows. The new default flush strategy for new Elasticsearch sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size.
4ea8c9c
to
8555e43
Compare
Build is passed in my repo: https://dev.azure.com/imjark/Flink/_build/results?buildId=162&view=results |
…ase sink for better out-of-box The default flush strategy for old HBase sink is no flush interval and 2MB buffered size. The new default flush strategy for new HBase sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size. This closes #12536
…r new Elasticsearch sink for better out-of-box The default flush strategy for old Elasticsearch sink is no flush interval and 5MB buffered size and 1000 rows. The new default flush strategy for new Elasticsearch sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size. This closes #12536
…C sink for better out-of-box The default flush strategy for old JDBC sink is no flush interval and 5000 buffered rows. The new default flush strategy for new JDBC sink is '1s' flush interval and '100' buffered rows. This closes #12536
…ase sink for better out-of-box The default flush strategy for old HBase sink is no flush interval and 2MB buffered size. The new default flush strategy for new HBase sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size. This closes #12536
…r new Elasticsearch sink for better out-of-box The default flush strategy for old Elasticsearch sink is no flush interval and 5MB buffered size and 1000 rows. The new default flush strategy for new Elasticsearch sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size. This closes #12536
…C sink for better out-of-box The default flush strategy for old JDBC sink is no flush interval and 5000 buffered rows. The new default flush strategy for new JDBC sink is '1s' flush interval and '100' buffered rows. This closes apache#12536
…ase sink for better out-of-box The default flush strategy for old HBase sink is no flush interval and 2MB buffered size. The new default flush strategy for new HBase sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size. This closes apache#12536
…r new Elasticsearch sink for better out-of-box The default flush strategy for old Elasticsearch sink is no flush interval and 5MB buffered size and 1000 rows. The new default flush strategy for new Elasticsearch sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size. This closes apache#12536
What is the purpose of the change
As discussed in the JIRA issue (mainly in FLINK-16497), the current flush behavior is depending on a large buffer size, but no flush interval which lead to no outputs.
Brief change log
JDBC:
HBase:
Elasticsearch:
All the options can be disabled by setting to
0
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation