diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java index 927a3214be2bd8..ae2c6d11f24e28 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java @@ -52,6 +52,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; @@ -134,10 +135,17 @@ public class FileSystemTableSink extends AbstractFileSystemTable @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) { - return (DataStreamSinkProvider) dataStream -> consume(dataStream, sinkContext); + return new DataStreamSinkProvider() { + @Override + public DataStreamSink consumeDataStream( + ProviderContext providerContext, DataStream dataStream) { + return consume(providerContext, dataStream, sinkContext); + } + }; } - private DataStreamSink consume(DataStream dataStream, Context sinkContext) { + private DataStreamSink consume( + ProviderContext providerContext, DataStream dataStream, Context sinkContext) { final int inputParallelism = dataStream.getParallelism(); final int parallelism = Optional.ofNullable(configuredParallelism).orElse(inputParallelism); @@ -148,7 +156,7 @@ private DataStreamSink consume(DataStream dataStream, Context sinkCo throw new IllegalStateException("Streaming mode not support overwrite."); } - return createStreamingSink(dataStream, sinkContext, parallelism); + return createStreamingSink(providerContext, dataStream, sinkContext, parallelism); } } @@ -182,7 +190,10 @@ private DataStreamSink createBatchSink( } private DataStreamSink createStreamingSink( - DataStream dataStream, Context sinkContext, final int parallelism) { + ProviderContext providerContext, + DataStream dataStream, + Context sinkContext, + final int parallelism) { FileSystemFactory fsFactory = FileSystem::get; RowDataPartitionComputer computer = partitionComputer(); @@ -246,6 +257,7 @@ private DataStreamSink createStreamingSink( writerStream = StreamingSink.compactionWriter( + providerContext, dataStream, bucketCheckInterval, bucketsBuilder, @@ -257,6 +269,7 @@ private DataStreamSink createStreamingSink( } else { writerStream = StreamingSink.writer( + providerContext, dataStream, bucketCheckInterval, bucketsBuilder, @@ -266,6 +279,7 @@ private DataStreamSink createStreamingSink( } return StreamingSink.sink( + providerContext, writerStream, path, tableIdentifier, diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java index ee157536eab526..140f1596c8f687 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java @@ -41,6 +41,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.util.function.SupplierWithException; import java.io.IOException; @@ -59,6 +60,7 @@ private StreamingSink() {} * addition, it can emit {@link PartitionCommitInfo} to down stream. */ public static DataStream writer( + ProviderContext providerContext, DataStream inputStream, long bucketCheckInterval, StreamingFileSink.BucketsBuilder< @@ -74,6 +76,7 @@ public static DataStream writer( StreamingFileWriter.class.getSimpleName(), TypeInformation.of(PartitionCommitInfo.class), fileWriter) + .uid(providerContext.generateUid("streaming-writer").get()) .setParallelism(parallelism); } @@ -82,6 +85,7 @@ public static DataStream writer( * {@link PartitionCommitInfo} to down stream. */ public static DataStream compactionWriter( + ProviderContext providerContext, DataStream inputStream, long bucketCheckInterval, StreamingFileSink.BucketsBuilder< @@ -106,11 +110,13 @@ public static DataStream compactionWriter( "streaming-writer", TypeInformation.of(CoordinatorInput.class), writer) + .uid(providerContext.generateUid("streaming-writer").get()) .setParallelism(parallelism) .transform( "compact-coordinator", TypeInformation.of(CoordinatorOutput.class), coordinator) + .uid(providerContext.generateUid("compact-coordinator").get()) .setParallelism(1) .setMaxParallelism(1); @@ -128,6 +134,7 @@ public static DataStream compactionWriter( "compact-operator", TypeInformation.of(PartitionCommitInfo.class), compacter) + .uid(providerContext.generateUid("compact-operator").get()) .setParallelism(parallelism); } @@ -136,6 +143,7 @@ public static DataStream compactionWriter( * to options. */ public static DataStreamSink sink( + ProviderContext providerContext, DataStream writer, Path locationPath, ObjectIdentifier identifier, @@ -151,10 +159,14 @@ public static DataStreamSink sink( stream = writer.transform( PartitionCommitter.class.getSimpleName(), Types.VOID, committer) + .uid(providerContext.generateUid("partition-committer").get()) .setParallelism(1) .setMaxParallelism(1); } - return stream.addSink(new DiscardingSink<>()).name("end").setParallelism(1); + return stream.addSink(new DiscardingSink<>()) + .uid(providerContext.generateUid("discarding-sink").get()) + .name("end") + .setParallelism(1); } }