Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
Expand All @@ -38,18 +37,16 @@
/** Sink builder to build a flink sink from input. */
public class FlinkSinkBuilder {

private final ObjectIdentifier tableIdentifier;
private final FileStoreTable table;
private final Configuration conf;

private DataStream<RowData> input;
@Nullable private CatalogLock.Factory lockFactory;
private Lock.Factory lockFactory = Lock.emptyFactory();
@Nullable private Map<String, String> overwritePartition;
@Nullable private LogSinkFunction logSinkFunction;
@Nullable private Integer parallelism;

public FlinkSinkBuilder(ObjectIdentifier tableIdentifier, FileStoreTable table) {
this.tableIdentifier = tableIdentifier;
public FlinkSinkBuilder(FileStoreTable table) {
this.table = table;
this.conf = Configuration.fromMap(table.schema().options());
}
Expand All @@ -59,7 +56,7 @@ public FlinkSinkBuilder withInput(DataStream<RowData> input) {
return this;
}

public FlinkSinkBuilder withLockFactory(CatalogLock.Factory lockFactory) {
public FlinkSinkBuilder withLockFactory(Lock.Factory lockFactory) {
this.lockFactory = lockFactory;
return this;
}
Expand Down Expand Up @@ -100,11 +97,10 @@ public DataStreamSink<?> build() {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
StoreSink sink =
new StoreSink(
tableIdentifier,
table,
lockFactory,
conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED),
getCompactPartSpec(),
lockFactory,
overwritePartition,
logSinkFunction);
return sink.sinkFrom(new DataStream<>(env, partitioned));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.FileStoreTable;
Expand All @@ -56,29 +54,24 @@ public class StoreSink implements Serializable {

private static final String GLOBAL_COMMITTER_NAME = "Global Committer";

private final ObjectIdentifier tableIdentifier;
private final FileStoreTable table;

private final Lock.Factory lockFactory;
private final boolean compactionTask;
@Nullable private final Map<String, String> compactPartitionSpec;
@Nullable private final CatalogLock.Factory lockFactory;
@Nullable private final Map<String, String> overwritePartition;
@Nullable private final LogSinkFunction logSinkFunction;

public StoreSink(
ObjectIdentifier tableIdentifier,
FileStoreTable table,
Lock.Factory lockFactory,
boolean compactionTask,
@Nullable Map<String, String> compactPartitionSpec,
@Nullable CatalogLock.Factory lockFactory,
@Nullable Map<String, String> overwritePartition,
@Nullable LogSinkFunction logSinkFunction) {
this.tableIdentifier = tableIdentifier;
this.table = table;

this.lockFactory = lockFactory;
this.compactionTask = compactionTask;
this.compactPartitionSpec = compactPartitionSpec;
this.lockFactory = lockFactory;
this.overwritePartition = overwritePartition;
this.logSinkFunction = logSinkFunction;
}
Expand All @@ -103,12 +96,11 @@ private OneInputStreamOperator<RowData, Committable> createWriteOperator(
}

private StoreCommitter createCommitter(String user, boolean createEmptyCommit) {
Lock lock = Lock.fromCatalog(lockFactory, tableIdentifier.toObjectPath());
return new StoreCommitter(
table.newCommit(user)
.withOverwritePartition(overwritePartition)
.withCreateEmptyCommit(createEmptyCommit)
.withLock(lock));
.withLock(lockFactory.create()));
}

public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
Expand Down Expand Up @@ -122,12 +123,13 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
: (logSinkProvider == null ? null : logSinkProvider.createSink());
return new TableStoreDataStreamSinkProvider(
(dataStream) ->
new FlinkSinkBuilder(tableIdentifier, table)
new FlinkSinkBuilder(table)
.withInput(
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
.withLockFactory(lockFactory)
.withLockFactory(
Lock.factory(lockFactory, tableIdentifier.toObjectPath()))
.withLogSinkFunction(logSinkFunction)
.withOverwritePartition(overwrite ? staticPartitions : null)
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void testPartitioned() throws Exception {
FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2});

// write
new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
env.execute();

// read
Expand All @@ -152,7 +152,7 @@ public void testNonPartitioned() throws Exception {
FileStoreTable table = buildFileStoreTable(new int[0], new int[] {2});

// write
new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
env.execute();

// read
Expand All @@ -171,7 +171,7 @@ public void testOverwrite() throws Exception {
FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2});

// write
new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
env.execute();

// overwrite p2
Expand All @@ -182,7 +182,7 @@ public void testOverwrite() throws Exception {
InternalTypeInfo.of(TABLE_TYPE));
Map<String, String> overwrite = new HashMap<>();
overwrite.put("p", "p2");
new FlinkSinkBuilder(IDENTIFIER, table)
new FlinkSinkBuilder(table)
.withInput(partialData)
.withOverwritePartition(overwrite)
.build();
Expand All @@ -201,7 +201,7 @@ public void testOverwrite() throws Exception {
Collections.singletonList(
wrap(GenericRowData.of(19, StringData.fromString("p2"), 6))),
InternalTypeInfo.of(TABLE_TYPE));
new FlinkSinkBuilder(IDENTIFIER, table)
new FlinkSinkBuilder(table)
.withInput(partialData)
.withOverwritePartition(new HashMap<>())
.build();
Expand All @@ -218,7 +218,7 @@ public void testPartitionedNonKey() throws Exception {
FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[0]);

// write
new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
env.execute();

// read
Expand Down Expand Up @@ -247,7 +247,7 @@ public void testNonKeyedProjection() throws Exception {

private void testProjection(FileStoreTable table) throws Exception {
// write
new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
env.execute();

// read
Expand Down Expand Up @@ -334,7 +334,7 @@ private void sinkAndValidate(
}
DataStreamSource<RowData> source =
env.addSource(new FiniteTestSource<>(src, true), InternalTypeInfo.of(TABLE_TYPE));
new FlinkSinkBuilder(IDENTIFIER, table).withInput(source).build();
new FlinkSinkBuilder(table).withInput(source).build();
env.execute();
assertThat(iterator.collect(expected.length)).containsExactlyInAnyOrder(expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.concurrent.Callable;

/** An interface that allows file store to use global lock to some transaction-related things. */
Expand All @@ -31,19 +32,65 @@ public interface Lock extends AutoCloseable {
/** Run with lock. */
<T> T runWithLock(Callable<T> callable) throws Exception;

@Nullable
static Lock fromCatalog(CatalogLock.Factory lockFactory, ObjectPath tablePath) {
if (lockFactory == null) {
return null;
/** A factory to create {@link Lock}. */
interface Factory extends Serializable {
Lock create();
}

static Factory factory(@Nullable CatalogLock.Factory lockFactory, ObjectPath tablePath) {
return lockFactory == null
? new EmptyFactory()
: new CatalogLockFactory(lockFactory, tablePath);
}

static Factory emptyFactory() {
return new EmptyFactory();
}

/** A {@link Factory} creating lock from catalog. */
class CatalogLockFactory implements Factory {

private static final long serialVersionUID = 1L;

private final CatalogLock.Factory lockFactory;
private final ObjectPath tablePath;

public CatalogLockFactory(CatalogLock.Factory lockFactory, ObjectPath tablePath) {
this.lockFactory = lockFactory;
this.tablePath = tablePath;
}

@Override
public Lock create() {
return fromCatalog(lockFactory.create(), tablePath);
}
}

/** A {@link Factory} creating empty lock. */
class EmptyFactory implements Factory {

private static final long serialVersionUID = 1L;

return fromCatalog(lockFactory.create(), tablePath);
@Override
public Lock create() {
return new EmptyLock();
}
}

/** An empty lock. */
class EmptyLock implements Lock {
@Override
public <T> T runWithLock(Callable<T> callable) throws Exception {
return callable.call();
}

@Override
public void close() {}
}

@Nullable
static Lock fromCatalog(CatalogLock lock, ObjectPath tablePath) {
if (lock == null) {
return null;
return new EmptyLock();
}
return new CatalogLockImpl(lock, tablePath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ private SchemaManager schemaManager(ObjectPath tablePath) {

private Lock lock(ObjectPath tablePath) {
if (!lockEnabled()) {
return null;
return new Lock.EmptyLock();
}

HiveCatalogLock lock =
Expand Down