-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: infer source parallelism for FLIP-27 source in batch execution mode #10832
Flink: infer source parallelism for FLIP-27 source in batch execution mode #10832
Conversation
e6c3a59
to
de63e1d
Compare
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
"testBasicRead", | ||
TypeInformation.of(RowData.class)) | ||
sourceBuilder | ||
.buildStream(env) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switched this class to test the new buildStream
API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have tests remaining to check the fromSource
API for bounded sources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will only switch this class
...ink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
Outdated
Show resolved
Hide resolved
Field privateField = | ||
MiniClusterExtension.class.getDeclaredField("internalMiniClusterExtension"); | ||
privateField.setAccessible(true); | ||
InternalMiniClusterExtension internalExtension = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use reflect to retrieve InternalMiniClusterExtension
to get MiniCluster
in order to get execution graph to verify source parallelism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you call
env.getTransformations().get(0).getParallelism()
before env.executeAsync()
then you could get the parallelism. Would this help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just tried it with debugger. the value is the default parallelism of 4 while the expected inferred source parallelism is 1 after the executeAsync()
DataStream<Row> dataStream =
IcebergSource.forRowData()
.tableLoader(CATALOG_EXTENSION.tableLoader())
.table(table)
.flinkConfig(config)
// force one file per split
.splitSize(1L)
.buildStream(env)
.map(new RowDataToRowMapper(FlinkSchemaUtil.convert(table.schema())));
int sourceParallelism = env.getTransformations().get(0).getParallelism();
* | ||
* @return data stream from the Iceberg source | ||
*/ | ||
public DataStream<T> buildStream(StreamExecutionEnvironment env) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new public API. I also thought about the method name as createStream
. but decided this name for now. open to other suggestion.
also think it is better to require StreamExecutionEnvironment
here instead of having it a builder method so that it is clear it is not required for the build()
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sentence about env
is also true for outputTypeInfo
, and watermarkStrategy
. Maybe adding them as a parameter would be reasonable too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently, outputTypeInfo
can be inferred from the ReaderFunction
if using provided RowData or Avro reader.
if (outputTypeInfo == null) {
this.outputTypeInfo = inferOutputTypeInfo(table, context, readerFunction);
}
watermarkStrategy
is defaulted WatermarkStrategy.noWatermarks
. so it is not mandatory either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outputTypeInfo
is not needed anymore with the Converter
interface. Removed watermark strategy for now. We can always add it back in the future if it is needed.
the new
|
72ea6fb
to
3ead397
Compare
if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) { | ||
// Simulate slow subtask 0 with attempt 0 | ||
TaskInfo taskInfo = getRuntimeContext().getTaskInfo(); | ||
if (taskInfo.getIndexOfThisSubtask() == 0 && taskInfo.getAttemptNumber() <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after this change of inferring source parallelism, this test would hang (even with the inferring parallelism flag turned off). It seems that speculative execution won't kick in somehow. This line of change seems to fix the problem however (tried local run 50 times without a failure). Not sure exactly why. Regardless of the reason, this seems like a good change anyway.
@pvary @venkata91 @becketqin let me know if you have any idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. Thanks for tagging me. Do you mean adding taskInfo.getIndexOfThisSubtask() == 0
solves the hang issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@venkata91 that is correct.
new Configuration() | ||
// disable classloader check as Avro may cache class/object in the serializers. | ||
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false) | ||
// disable inferring source parallelism |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inferred parallelism might mess up the watermark and record ordering comparison. disable it to avoid the flakiness
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this so?
Do you have any idea? Would it be an issue in prod?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question. let me dig more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestIcebergSourceSql
assume the parallelism is 1 for testWatermarkOptionsAscending
and testWatermarkOptionsDescending
. Table has 2 files. This test just check split assignment is ordered with single reader.
tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", "1");
I think TestIcebergSourceWithWatermarkExtractor
similarly assumes parallelism of 4. inferring parallelism would change source parallelism to the number of splits and potentially inferring with the assertion on ordering of the read records..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe explicitly setting the parallelism in those tests would be better.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, we don't need the disabling for this particular test as it doesn't go through the buildStream(env)
path where infer parallelism happens. will revert the change
1035dad
to
aaf3765
Compare
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
* Optional. Default is no watermark strategy. Only relevant if using the {@link | ||
* Builder#buildStream(StreamExecutionEnvironment)}. | ||
*/ | ||
public Builder<T> watermarkStrategy(WatermarkStrategy<T> newStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to provide a useful WatermarkStrategy
?
I think it is possible to provide a useful TimestampAssigner
, but I don't see how can someone provide a useful WatermarkGenerator
. The only possible way to generate watermarks are with the watermarkColumn
is provided, but even then the WatermarkGenerator
should not be used.
Do I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only for the buildStream
method. this just moved the watermark strategy from env.fromSource
to the builder.
for regular build
method, users would also need to set the watermark strategy, which most likely would be none.
DataStream<RowData> stream =
env.fromSource(
sourceBuilder().build(),
WatermarkStrategy.noWatermarks(),
"IcebergSource",
TypeInformation.of(RowData.class));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove WatermarkStrategy
from here. we can always add another buildStream
with a new arg in the future if there is really a need.
@@ -503,28 +569,10 @@ public IcebergSource<T> build() { | |||
new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor)); | |||
} | |||
|
|||
ScanContext context = contextBuilder.build(); | |||
this.context = contextBuilder.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ugly as hell.
Side-effect of the build method....
Maybe creating an init()
with all the side-effects?
And then build()
which uses the initialized attributes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree side-effect is undesirable. let me think of a way to refactor the code. maybe extract the ScanContext
building into a separate method from build()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the Converter
interface will obsolete this problem.
...9/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
Outdated
Show resolved
Hide resolved
...9/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java
Outdated
Show resolved
Hide resolved
be4ce67
to
09cb0eb
Compare
32310d4
to
3aabbad
Compare
3aabbad
to
f86449a
Compare
thanks @pvary for the review |
* main: (208 commits) Docs: Fix Flink 1.20 support versions (apache#11065) Flink: Fix compile warning (apache#11072) Docs: Initial committer guidelines and requirements for merging (apache#10780) Core: Refactor ZOrderByteUtils (apache#10624) API: implement types timestamp_ns and timestamptz_ns (apache#9008) Build: Bump com.google.errorprone:error_prone_annotations (apache#11055) Build: Bump mkdocs-material from 9.5.33 to 9.5.34 (apache#11062) Flink: Backport PR apache#10526 to v1.18 and v1.20 (apache#11018) Kafka Connect: Disable publish tasks in runtime project (apache#11032) Flink: add unit tests for range distribution on bucket partition column (apache#11033) Spark 3.5: Use FileGenerationUtil in PlanningBenchmark (apache#11027) Core: Add benchmark for appending files (apache#11029) Build: Ignore benchmark output folders across all modules (apache#11030) Spec: Add RemovePartitionSpecsUpdate REST update type (apache#10846) Docs: bump latest version to 1.6.1 (apache#11036) OpenAPI, Build: Apply spotless to testFixtures source code (apache#11024) Core: Generate realistic bounds in benchmarks (apache#11022) Add REST Compatibility Kit (apache#10908) Flink: backport PR apache#10832 of inferring parallelism in FLIP-27 source (apache#11009) Docs: Add Druid docs url to sidebar (apache#10997) ...
flinkConf, | ||
scanContext.limit(), | ||
() -> { | ||
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure whether it is intentional to modify the split list instance field in planSplitsForBatch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is intentional. See the comment for the method. We cache it as it will be reused later.
No description provided.