Skip to content

Commit

Permalink
[fixup][connector-files] Use the new ProviderContext based method
Browse files Browse the repository at this point in the history
Signed-off-by: slinkydeveloper <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper committed Feb 8, 2022
1 parent 56db1e5 commit db2efa4
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
Expand Down Expand Up @@ -134,10 +135,17 @@ public class FileSystemTableSink extends AbstractFileSystemTable

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
return (DataStreamSinkProvider) dataStream -> consume(dataStream, sinkContext);
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return consume(providerContext, dataStream, sinkContext);
}
};
}

private DataStreamSink<?> consume(DataStream<RowData> dataStream, Context sinkContext) {
private DataStreamSink<?> consume(
ProviderContext providerContext, DataStream<RowData> dataStream, Context sinkContext) {
final int inputParallelism = dataStream.getParallelism();
final int parallelism = Optional.ofNullable(configuredParallelism).orElse(inputParallelism);

Expand All @@ -148,7 +156,7 @@ private DataStreamSink<?> consume(DataStream<RowData> dataStream, Context sinkCo
throw new IllegalStateException("Streaming mode not support overwrite.");
}

return createStreamingSink(dataStream, sinkContext, parallelism);
return createStreamingSink(providerContext, dataStream, sinkContext, parallelism);
}
}

Expand Down Expand Up @@ -182,7 +190,10 @@ private DataStreamSink<RowData> createBatchSink(
}

private DataStreamSink<?> createStreamingSink(
DataStream<RowData> dataStream, Context sinkContext, final int parallelism) {
ProviderContext providerContext,
DataStream<RowData> dataStream,
Context sinkContext,
final int parallelism) {
FileSystemFactory fsFactory = FileSystem::get;
RowDataPartitionComputer computer = partitionComputer();

Expand Down Expand Up @@ -246,6 +257,7 @@ private DataStreamSink<?> createStreamingSink(

writerStream =
StreamingSink.compactionWriter(
providerContext,
dataStream,
bucketCheckInterval,
bucketsBuilder,
Expand All @@ -257,6 +269,7 @@ private DataStreamSink<?> createStreamingSink(
} else {
writerStream =
StreamingSink.writer(
providerContext,
dataStream,
bucketCheckInterval,
bucketsBuilder,
Expand All @@ -266,6 +279,7 @@ private DataStreamSink<?> createStreamingSink(
}

return StreamingSink.sink(
providerContext,
writerStream,
path,
tableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.util.function.SupplierWithException;

import java.io.IOException;
Expand All @@ -59,6 +60,7 @@ private StreamingSink() {}
* addition, it can emit {@link PartitionCommitInfo} to down stream.
*/
public static <T> DataStream<PartitionCommitInfo> writer(
ProviderContext providerContext,
DataStream<T> inputStream,
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<
Expand All @@ -74,6 +76,7 @@ public static <T> DataStream<PartitionCommitInfo> writer(
StreamingFileWriter.class.getSimpleName(),
TypeInformation.of(PartitionCommitInfo.class),
fileWriter)
.uid(providerContext.generateUid("streaming-writer").get())
.setParallelism(parallelism);
}

Expand All @@ -82,6 +85,7 @@ public static <T> DataStream<PartitionCommitInfo> writer(
* {@link PartitionCommitInfo} to down stream.
*/
public static <T> DataStream<PartitionCommitInfo> compactionWriter(
ProviderContext providerContext,
DataStream<T> inputStream,
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<
Expand All @@ -106,11 +110,13 @@ public static <T> DataStream<PartitionCommitInfo> compactionWriter(
"streaming-writer",
TypeInformation.of(CoordinatorInput.class),
writer)
.uid(providerContext.generateUid("streaming-writer").get())
.setParallelism(parallelism)
.transform(
"compact-coordinator",
TypeInformation.of(CoordinatorOutput.class),
coordinator)
.uid(providerContext.generateUid("compact-coordinator").get())
.setParallelism(1)
.setMaxParallelism(1);

Expand All @@ -128,6 +134,7 @@ public static <T> DataStream<PartitionCommitInfo> compactionWriter(
"compact-operator",
TypeInformation.of(PartitionCommitInfo.class),
compacter)
.uid(providerContext.generateUid("compact-operator").get())
.setParallelism(parallelism);
}

Expand All @@ -136,6 +143,7 @@ public static <T> DataStream<PartitionCommitInfo> compactionWriter(
* to options.
*/
public static DataStreamSink<?> sink(
ProviderContext providerContext,
DataStream<PartitionCommitInfo> writer,
Path locationPath,
ObjectIdentifier identifier,
Expand All @@ -151,10 +159,14 @@ public static DataStreamSink<?> sink(
stream =
writer.transform(
PartitionCommitter.class.getSimpleName(), Types.VOID, committer)
.uid(providerContext.generateUid("partition-committer").get())
.setParallelism(1)
.setMaxParallelism(1);
}

return stream.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
return stream.addSink(new DiscardingSink<>())
.uid(providerContext.generateUid("discarding-sink").get())
.name("end")
.setParallelism(1);
}
}

0 comments on commit db2efa4

Please sign in to comment.