Skip to content

Commit

Permalink
Perform resource existence checks only once in Mongo connector [HZ-2673
Browse files Browse the repository at this point in the history
…] (#24953)
  • Loading branch information
TomaszGaweda committed Sep 19, 2023
1 parent a113218 commit 4fb0658
Show file tree
Hide file tree
Showing 26 changed files with 751 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.mongodb.impl.DbCheckingPMetaSupplierBuilder;
import com.hazelcast.jet.mongodb.impl.WriteMongoP;
import com.hazelcast.jet.mongodb.impl.WriteMongoParams;
import com.hazelcast.jet.pipeline.DataConnectionRef;
Expand Down Expand Up @@ -84,6 +84,7 @@ public final class MongoSinkBuilder<T> {
private final WriteMongoParams<T> params = new WriteMongoParams<>();

private int preferredLocalParallelism = 2;
private ResourceChecks existenceChecks = ResourceChecks.ONCE_PER_JOB;

/**
* See {@link MongoSinks#builder}
Expand Down Expand Up @@ -236,15 +237,17 @@ public MongoSinkBuilder<T> writeMode(@Nonnull WriteMode writeMode) {
}

/**
* If {@code true}, the lack of database or collection will cause an error.
* If {@code false}, database and collection will be automatically created.
* Default value is {@code true}.
* If {@link ResourceChecks#NEVER}, the database and collection will be automatically created on the first usage.
* Otherwise, querying for a database or collection that don't exist will cause an error.
* Default value is {@link ResourceChecks#ONCE_PER_JOB}.
*
* @param throwOnNonExisting if exception should be thrown when database or collection does not exist.
* @since 5.4
* @param checkResourceExistence mode of resource existence checks; whether exception should be thrown when
* database or collection does not exist and when the check will be performed.
*/
@Nonnull
public MongoSinkBuilder<T> throwOnNonExisting(boolean throwOnNonExisting) {
params.setThrowOnNonExisting(throwOnNonExisting);
public MongoSinkBuilder<T> checkResourceExistence(ResourceChecks checkResourceExistence) {
existenceChecks = checkResourceExistence;
return this;
}

Expand All @@ -253,13 +256,23 @@ public MongoSinkBuilder<T> throwOnNonExisting(boolean throwOnNonExisting) {
* supplied to this builder.
*/
@Nonnull
public Sink<T> build() {
public Sink<T> build() {
params.checkValid();
final WriteMongoParams<T> localParams = this.params;
localParams.setCheckExistenceOnEachConnect(existenceChecks == ResourceChecks.ON_EACH_CONNECT);

ConnectorPermission permission = params.buildPermission();
return Sinks.fromProcessor(name, ProcessorMetaSupplier.of(preferredLocalParallelism, permission,
ProcessorSupplier.of(() -> new WriteMongoP<>(localParams))));
return Sinks.fromProcessor(name, new DbCheckingPMetaSupplierBuilder()
.withRequiredPermission(permission)
.withCheckResourceExistence(localParams.isCheckExistenceOnEachConnect())
.withForceTotalParallelismOne(false)
.withDatabaseName(localParams.getDatabaseName())
.withCollectionName(localParams.getCollectionName())
.withClientSupplier(localParams.getClientSupplier())
.withDataConnectionRef(localParams.getDataConnectionRef())
.withProcessorSupplier(ProcessorSupplier.of(() -> new WriteMongoP<>(localParams)))
.withPreferredLocalParallelism(preferredLocalParallelism)
.build());
}

}

0 comments on commit 4fb0658

Please sign in to comment.