Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25990][table] Expose uid generator for DataStream/Transformation providers #18667

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
Expand Down Expand Up @@ -134,10 +135,12 @@ public class FileSystemTableSink extends AbstractFileSystemTable

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
return (DataStreamSinkProvider) dataStream -> consume(dataStream, sinkContext);
return (DataStreamSinkProvider)
(providerContext, dataStream) -> consume(providerContext, dataStream, sinkContext);
}

private DataStreamSink<?> consume(DataStream<RowData> dataStream, Context sinkContext) {
private DataStreamSink<?> consume(
ProviderContext providerContext, DataStream<RowData> dataStream, Context sinkContext) {
final int inputParallelism = dataStream.getParallelism();
final int parallelism = Optional.ofNullable(configuredParallelism).orElse(inputParallelism);

Expand All @@ -148,7 +151,7 @@ private DataStreamSink<?> consume(DataStream<RowData> dataStream, Context sinkCo
throw new IllegalStateException("Streaming mode not support overwrite.");
}

return createStreamingSink(dataStream, sinkContext, parallelism);
return createStreamingSink(providerContext, dataStream, sinkContext, parallelism);
}
}

Expand Down Expand Up @@ -182,7 +185,10 @@ private DataStreamSink<RowData> createBatchSink(
}

private DataStreamSink<?> createStreamingSink(
DataStream<RowData> dataStream, Context sinkContext, final int parallelism) {
ProviderContext providerContext,
DataStream<RowData> dataStream,
Context sinkContext,
final int parallelism) {
FileSystemFactory fsFactory = FileSystem::get;
RowDataPartitionComputer computer = partitionComputer();

Expand Down Expand Up @@ -246,6 +252,7 @@ private DataStreamSink<?> createStreamingSink(

writerStream =
StreamingSink.compactionWriter(
providerContext,
dataStream,
bucketCheckInterval,
bucketsBuilder,
Expand All @@ -257,6 +264,7 @@ private DataStreamSink<?> createStreamingSink(
} else {
writerStream =
StreamingSink.writer(
providerContext,
dataStream,
bucketCheckInterval,
bucketsBuilder,
Expand All @@ -266,6 +274,7 @@ private DataStreamSink<?> createStreamingSink(
}

return StreamingSink.sink(
providerContext,
writerStream,
path,
tableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.util.function.SupplierWithException;

import java.io.IOException;
Expand All @@ -59,6 +60,7 @@ private StreamingSink() {}
* addition, it can emit {@link PartitionCommitInfo} to down stream.
*/
public static <T> DataStream<PartitionCommitInfo> writer(
ProviderContext providerContext,
DataStream<T> inputStream,
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<
Expand All @@ -74,6 +76,7 @@ public static <T> DataStream<PartitionCommitInfo> writer(
StreamingFileWriter.class.getSimpleName(),
TypeInformation.of(PartitionCommitInfo.class),
fileWriter)
.uid(providerContext.generateUid("streaming-writer").get())
.setParallelism(parallelism);
}

Expand All @@ -82,6 +85,7 @@ public static <T> DataStream<PartitionCommitInfo> writer(
* {@link PartitionCommitInfo} to down stream.
*/
public static <T> DataStream<PartitionCommitInfo> compactionWriter(
ProviderContext providerContext,
DataStream<T> inputStream,
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<
Expand All @@ -106,11 +110,13 @@ public static <T> DataStream<PartitionCommitInfo> compactionWriter(
"streaming-writer",
TypeInformation.of(CoordinatorInput.class),
writer)
.uid(providerContext.generateUid("streaming-writer").get())
.setParallelism(parallelism)
.transform(
"compact-coordinator",
TypeInformation.of(CoordinatorOutput.class),
coordinator)
.uid(providerContext.generateUid("compact-coordinator").get())
.setParallelism(1)
.setMaxParallelism(1);

Expand All @@ -128,6 +134,7 @@ public static <T> DataStream<PartitionCommitInfo> compactionWriter(
"compact-operator",
TypeInformation.of(PartitionCommitInfo.class),
compacter)
.uid(providerContext.generateUid("compact-operator").get())
.setParallelism(parallelism);
}

Expand All @@ -136,6 +143,7 @@ public static <T> DataStream<PartitionCommitInfo> compactionWriter(
* to options.
*/
public static DataStreamSink<?> sink(
ProviderContext providerContext,
DataStream<PartitionCommitInfo> writer,
Path locationPath,
ObjectIdentifier identifier,
Expand All @@ -151,10 +159,14 @@ public static DataStreamSink<?> sink(
stream =
writer.transform(
PartitionCommitter.class.getSimpleName(), Types.VOID, committer)
.uid(providerContext.generateUid("partition-committer").get())
.setParallelism(1)
.setMaxParallelism(1);
}

return stream.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
return stream.addSink(new DiscardingSink<>())
.uid(providerContext.generateUid("discarding-sink").get())
.name("end")
.setParallelism(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
Expand Down Expand Up @@ -144,11 +145,15 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
DataStructureConverter converter =
context.createDataStructureConverter(tableSchema.toRowDataType());
return (DataStreamSinkProvider)
dataStream -> consume(dataStream, context.isBounded(), converter);
(providerContext, dataStream) ->
consume(providerContext, dataStream, context.isBounded(), converter);
}

private DataStreamSink<?> consume(
DataStream<RowData> dataStream, boolean isBounded, DataStructureConverter converter) {
ProviderContext providerContext,
DataStream<RowData> dataStream,
boolean isBounded,
DataStructureConverter converter) {
checkAcidTable(catalogTable.getOptions(), identifier.toObjectPath());

try (HiveMetastoreClientWrapper client =
Expand Down Expand Up @@ -194,7 +199,13 @@ private DataStreamSink<?> consume(

Properties tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
return createStreamSink(
dataStream, sd, tableProps, writerFactory, fileNamingBuilder, parallelism);
providerContext,
dataStream,
sd,
tableProps,
writerFactory,
fileNamingBuilder,
parallelism);
}
} catch (TException e) {
throw new CatalogException("Failed to query Hive metaStore", e);
Expand Down Expand Up @@ -240,6 +251,7 @@ private DataStreamSink<Row> createBatchSink(
}

private DataStreamSink<?> createStreamSink(
ProviderContext providerContext,
DataStream<RowData> dataStream,
StorageDescriptor sd,
Properties tableProps,
Expand Down Expand Up @@ -322,6 +334,7 @@ private DataStreamSink<?> createStreamSink(

writerStream =
StreamingSink.compactionWriter(
providerContext,
dataStream,
bucketCheckInterval,
builder,
Expand All @@ -333,6 +346,7 @@ private DataStreamSink<?> createStreamSink(
} else {
writerStream =
StreamingSink.writer(
providerContext,
dataStream,
bucketCheckInterval,
builder,
Expand All @@ -342,7 +356,14 @@ private DataStreamSink<?> createStreamSink(
}

return StreamingSink.sink(
writerStream, path, identifier, getPartitionKeys(), msFactory(), fsFactory(), conf);
providerContext,
writerStream,
path,
identifier,
getPartitionKeys(),
msFactory(),
fsFactory(),
conf);
}

private CompactReader.Factory<RowData> createCompactReaderFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class HiveTableSource
SupportsProjectionPushDown,
SupportsLimitPushDown {

private static final String HIVE_TRANSFORMATION = "hive";

protected final JobConf jobConf;
protected final ReadableConfig flinkConf;
protected final ObjectPath tablePath;
Expand Down Expand Up @@ -109,8 +112,9 @@ public HiveTableSource(
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
return getDataStream(execEnv);
public DataStream<RowData> produceDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
return getDataStream(providerContext, execEnv);
}

@Override
Expand All @@ -121,14 +125,18 @@ public boolean isBounded() {
}

@VisibleForTesting
protected DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
protected DataStream<RowData> getDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
HiveSourceBuilder sourceBuilder =
new HiveSourceBuilder(jobConf, flinkConf, tablePath, hiveVersion, catalogTable)
.setProjectedFields(projectedFields)
.setLimit(limit);

if (isStreamingSource()) {
return toDataStreamSource(execEnv, sourceBuilder.buildWithDefaultBulkFormat());
DataStreamSource<RowData> sourceStream =
toDataStreamSource(execEnv, sourceBuilder.buildWithDefaultBulkFormat());
providerContext.generateUid(HIVE_TRANSFORMATION).ifPresent(sourceStream::uid);
return sourceStream;
} else {
List<HiveTablePartition> hivePartitionsToRead =
getAllPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.TableSourceFactory;
Expand Down Expand Up @@ -959,9 +960,10 @@ private static class TestConfigSource extends HiveTableSource {
}

@Override
public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
public DataStream<RowData> getDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
DataStreamSource<RowData> dataStream =
(DataStreamSource<RowData>) super.getDataStream(execEnv);
(DataStreamSource<RowData>) super.getDataStream(providerContext, execEnv);
int parallelism = dataStream.getTransformation().getParallelism();
assertEquals(inferParallelism ? 1 : 2, parallelism);
return dataStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
@Internal
public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetadata {

private static final String UPSERT_KAFKA_TRANSFORMATION = "upsert-kafka";

// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -214,7 +216,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.build();
if (flushMode.isEnabled() && upsertMode) {
return (DataStreamSinkProvider)
dataStream -> {
(providerContext, dataStream) -> {
final boolean objectReuse =
dataStream
.getExecutionEnvironment()
Expand All @@ -233,6 +235,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
::copy
: rowData -> rowData);
final DataStreamSink<RowData> end = dataStream.sinkTo(sink);
providerContext
.generateUid(UPSERT_KAFKA_TRANSFORMATION)
.ifPresent(end::uid);
if (parallelism != null) {
end.setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
Expand All @@ -36,6 +37,7 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
Expand Down Expand Up @@ -78,6 +80,8 @@
public class KafkaDynamicSource
implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {

private static final String KAFKA_TRANSFORMATION = "kafka";

// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -221,12 +225,16 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {

return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
public DataStream<RowData> produceDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
if (watermarkStrategy == null) {
watermarkStrategy = WatermarkStrategy.noWatermarks();
}
return execEnv.fromSource(
kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);
DataStreamSource<RowData> sourceStream =
execEnv.fromSource(
kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);
providerContext.generateUid(KAFKA_TRANSFORMATION).ifPresent(sourceStream::uid);
return sourceStream;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1107,7 +1108,9 @@ private KafkaSource<?> assertKafkaSource(ScanTableSource.ScanRuntimeProvider pro
final DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) provider;
final Transformation<RowData> transformation =
dataStreamScanProvider
.produceDataStream(StreamExecutionEnvironment.createLocalEnvironment())
.produceDataStream(
n -> Optional.empty(),
StreamExecutionEnvironment.createLocalEnvironment())
.getTransformation();
assertThat(transformation).isInstanceOf(SourceTransformation.class);
SourceTransformation<RowData, KafkaPartitionSplit, KafkaSourceEnumState>
Expand Down
Loading