Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ jobs:
strategy:
matrix:
mongodb: [ mongodb4, mongodb5, mongodb6, mongodb7 ]
flink: [ 1.19-SNAPSHOT, 1.20-SNAPSHOT ]
jdk: [ '8, 11, 17, 21' ]
flink: [ 2.1-SNAPSHOT ]
jdk: [ '17, 21' ]
Comment on lines +32 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution, @neoneo008 .
This looks good overall, but I think we should preserve JDK 11 support for now.


uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
Expand Down Expand Up @@ -78,7 +79,7 @@ public static <IN> MongoSinkBuilder<IN> builder() {
}

@Override
public SinkWriter<IN> createWriter(InitContext context) {
public SinkWriter<IN> createWriter(WriterInitContext context) {
return new MongoWriter<>(
connectionOptions,
writeOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.sink.writer.context.DefaultMongoSinkContext;
Expand Down Expand Up @@ -89,7 +89,7 @@ public MongoWriter(
MongoConnectionOptions connectionOptions,
MongoWriteOptions writeOptions,
boolean flushOnCheckpoint,
Sink.InitContext initContext,
WriterInitContext initContext,
MongoSerializationSchema<IN> serializationSchema) {
this.connectionOptions = checkNotNull(connectionOptions);
this.writeOptions = checkNotNull(writeOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@
package org.apache.flink.connector.mongodb.sink.writer.context;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;

/** Default {@link MongoSinkContext} implementation. */
@Internal
public class DefaultMongoSinkContext implements MongoSinkContext {

private final Sink.InitContext initContext;
private final WriterInitContext initContext;
private final MongoWriteOptions writeOptions;

public DefaultMongoSinkContext(Sink.InitContext initContext, MongoWriteOptions writeOptions) {
public DefaultMongoSinkContext(WriterInitContext initContext, MongoWriteOptions writeOptions) {
this.initContext = initContext;
this.writeOptions = writeOptions;
}

@Override
public Sink.InitContext getInitContext() {
public WriterInitContext getInitContext() {
return initContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.connector.mongodb.sink.writer.context;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;

Expand All @@ -27,7 +27,7 @@
public interface MongoSinkContext {

/** Returns the current sink's init context. */
Sink.InitContext getInitContext();
WriterInitContext getInitContext();

/** Returns the current process time in flink. */
long processTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
import org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
Expand Down Expand Up @@ -134,8 +132,6 @@ public Boundedness getBoundedness() {

@Override
public SourceReader<OUT, MongoSourceSplit> createReader(SourceReaderContext readerContext) {
FutureCompletingBlockingQueue<RecordsWithSplitIds<BsonDocument>> elementsQueue =
new FutureCompletingBlockingQueue<>();

MongoSourceReaderContext mongoReaderContext =
new MongoSourceReaderContext(readerContext, limit);
Expand All @@ -150,7 +146,6 @@ public SourceReader<OUT, MongoSourceSplit> createReader(SourceReaderContext read
mongoReaderContext);

return new MongoSourceReader<>(
elementsQueue,
splitReaderSupplier,
new MongoRecordEmitter<>(deserializationSchema),
mongoReaderContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplitState;
import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
Expand All @@ -49,13 +47,12 @@ public class MongoSourceReader<OUT>
private static final Logger LOG = LoggerFactory.getLogger(MongoSourceReader.class);

public MongoSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<BsonDocument>> elementQueue,
Supplier<SplitReader<BsonDocument, MongoSourceSplit>> splitReaderSupplier,
RecordEmitter<BsonDocument, OUT, MongoSourceSplitState> recordEmitter,
MongoSourceReaderContext readerContext) {
super(
elementQueue,
new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier),
new SingleThreadFetcherManager<>(
splitReaderSupplier, readerContext.getConfiguration()),
recordEmitter,
readerContext.getConfiguration(),
readerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.flink.connector.mongodb.sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
Expand Down Expand Up @@ -101,9 +102,11 @@ void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee)
throws Exception {
final String collection = "test-sink-with-delivery-" + deliveryGuarantee;
final MongoSink<Document> sink = createSink(collection, deliveryGuarantee);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.enableCheckpointing(100L);
env.setRestartStrategy(RestartStrategies.noRestart());

env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
env.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ void testSinkContext() throws Exception {

MongoSerializationSchema<Document> testSerializationSchema =
(element, context) -> {
assertThat(context.getInitContext().getSubtaskId()).isEqualTo(0);
assertThat(context.getInitContext().getTaskInfo().getIndexOfThisSubtask())
.isEqualTo(0);
assertThat(context.getWriteOptions()).isEqualTo(expectOptions);
assertThat(context.processTime())
.isEqualTo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ private List<ResolvedExpression> resolveSQLFilterToExpression(
RexNodeExpression rexExp =
(RexNodeExpression) tbImpl.getParser().parseSqlExpression(sqlExp, sourceType, null);

RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, -1, rexExp.getRexNode());
RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, rexExp.getRexNode());
// converts the cnf condition to a list of AND conditions
List<RexNode> conjunctions = RelOptUtil.conjunctions(cnf);

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ under the License.
<mongodb7.version>7.0.12</mongodb7.version>
<mongodb.version>${mongodb4.version}</mongodb.version>

<flink.version>1.20.0</flink.version>
<flink.version>2.1.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala-library.version>2.12.7</scala-library.version>
<junit5.version>5.8.1</junit5.version>
Expand Down
Loading