Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform resource existence checks only once in Mongo connector [HZ-2673] #24953

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5bb1ca4
Implemented ProcessorMetaSupplier, that will check if given db and co…
TomaszGaweda Jul 4, 2023
61623cf
Usage in SQL
TomaszGaweda Jul 4, 2023
fee95c5
Checkstyle
TomaszGaweda Jul 4, 2023
ccd84bc
Remove old checks
TomaszGaweda Jul 4, 2023
c4e208c
Checkstyle
TomaszGaweda Jul 4, 2023
a43701a
New tests
TomaszGaweda Jul 4, 2023
4aec1e0
Allow initial-only checks
TomaszGaweda Jul 4, 2023
6baaa13
Replace constructor with builder
TomaszGaweda Jul 5, 2023
2093c0b
Checkstyle
TomaszGaweda Jul 5, 2023
0b7e4d2
Merge branch 'master' into fixes/5.4/database-checks-in-mongo
TomaszGaweda Jul 5, 2023
66a1225
Changes after CR
TomaszGaweda Jul 5, 2023
277860e
Checkstyle, cleanup
TomaszGaweda Jul 5, 2023
cf74437
Refactor after CR
TomaszGaweda Jul 5, 2023
c19ee2a
CR
TomaszGaweda Jul 6, 2023
fbcb55c
Simplify Mongo ProcessorSuppliers
TomaszGaweda Jul 13, 2023
726d39d
Add missing try-with-resources
TomaszGaweda Jul 17, 2023
ae15a6d
Merge branch 'master' into fixes/5.4/database-checks-in-mongo
TomaszGaweda Jul 17, 2023
70c8df2
Merge branch 'master' into fixes/5.4/database-checks-in-mongo
TomaszGaweda Aug 17, 2023
3aea368
Merge branch 'master' into fixes/5.4/database-checks-in-mongo
TomaszGaweda Sep 5, 2023
cfcd0b5
Merge branch 'master' into fixes/5.4/database-checks-in-mongo
TomaszGaweda Sep 5, 2023
3f5d835
Resolved conflicts
TomaszGaweda Sep 5, 2023
68be517
Merge fix
TomaszGaweda Sep 5, 2023
4f968d1
Checkstyle
TomaszGaweda Sep 5, 2023
33060b2
Added forceReadParallelismOne to Java API as well
TomaszGaweda Sep 5, 2023
1ba9258
Better name
TomaszGaweda Sep 5, 2023
a28c42b
Merge branch 'master' into fixes/5.4/database-checks-in-mongo
TomaszGaweda Sep 5, 2023
195b4ae
Merge branch 'master' into fixes/5.4/database-checks-in-mongo
TomaszGaweda Sep 6, 2023
c514558
CR comment
TomaszGaweda Sep 6, 2023
d365f2f
CR comments
TomaszGaweda Sep 8, 2023
71a9b68
Merge branch 'master' into fixes/5.4/database-checks-in-mongo
TomaszGaweda Sep 8, 2023
4c6c69f
Remove leftover code
TomaszGaweda Sep 8, 2023
b3e8c12
Fix serialization
TomaszGaweda Sep 11, 2023
874e21f
Fix name and fix parameter assignment
TomaszGaweda Sep 11, 2023
7831956
CR comments
TomaszGaweda Sep 15, 2023
84e279f
CR comment - coherent methods
TomaszGaweda Sep 15, 2023
37b41ed
Checkstyle
TomaszGaweda Sep 15, 2023
d22dbe7
CR comments
TomaszGaweda Sep 18, 2023
297eb57
Cleanup
TomaszGaweda Sep 18, 2023
0fd21be
CR comments
TomaszGaweda Sep 18, 2023
0dcfce2
Checkstyle
TomaszGaweda Sep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.mongodb.impl.DbCheckingPMetaSupplierBuilder;
import com.hazelcast.jet.mongodb.impl.WriteMongoP;
import com.hazelcast.jet.mongodb.impl.WriteMongoParams;
import com.hazelcast.jet.pipeline.DataConnectionRef;
Expand Down Expand Up @@ -253,13 +253,22 @@ public MongoSinkBuilder<T> throwOnNonExisting(boolean throwOnNonExisting) {
* supplied to this builder.
*/
@Nonnull
public Sink<T> build() {
public Sink<T> build() {
params.checkValid();
final WriteMongoParams<T> localParams = this.params;

ConnectorPermission permission = params.buildPermission();
return Sinks.fromProcessor(name, ProcessorMetaSupplier.of(preferredLocalParallelism, permission,
ProcessorSupplier.of(() -> new WriteMongoP<>(localParams))));
return Sinks.fromProcessor(name, new DbCheckingPMetaSupplierBuilder()
.setRequiredPermission(permission)
.setShouldCheck(localParams.isThrowOnNonExisting())
.setForceTotalParallelismOne(false)
.setDatabaseName(localParams.getDatabaseName())
.setCollectionName(localParams.getCollectionName())
.setClientSupplier(localParams.getClientSupplier())
.setDataConnectionRef(localParams.getDataConnectionRef())
.setProcessorSupplier(ProcessorSupplier.of(() -> new WriteMongoP<>(localParams)))
.create()
.withPreferredLocalParallelism(preferredLocalParallelism));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.mongodb.impl.DbCheckingPMetaSupplierBuilder;
import com.hazelcast.jet.mongodb.impl.ReadMongoP;
import com.hazelcast.jet.mongodb.impl.ReadMongoParams;
import com.hazelcast.jet.pipeline.BatchSource;
Expand Down Expand Up @@ -414,8 +414,16 @@ public BatchSource<T> build() {
final ReadMongoParams<T> localParams = params;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the point of having localParams variable? ReadMongoParams is mutable and ReadMongoParams.setXXX methods do not return new instance.
I can see that it is used in lambda in withProcessorSupplier but all sources created from the same builder will share the same instance of params. If the builder is modified after build() you can have problems (local variable fixes compilation problem but not the sharing problem)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

treating builder as a prototype might not be a best idea but someone might try do that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usage of localParams throughout this methods makes me think that maybe you wanted to clone them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have similar situation in MongoSinkBuilder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Referencing fields of the builder inside lambda will catch the reference to the builder, meaning that whole builder would need to become serializable. Referencing local variable instead solves the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but it does not solve problem with builder as prototype. We do not control lifecycle of the builder and the time when PMS will be serialized.


ConnectorPermission permission = params.buildPermissions();
return Sources.batchFromProcessor(name, ProcessorMetaSupplier.of(permission,
ProcessorSupplier.of(() -> new ReadMongoP<>(localParams))));
return Sources.batchFromProcessor(name, new DbCheckingPMetaSupplierBuilder()
.setRequiredPermission(permission)
.setShouldCheck(localParams.isThrowOnNonExisting())
.setForceTotalParallelismOne(false)
.setDatabaseName(localParams.getDatabaseName())
.setCollectionName(localParams.getCollectionName())
.setClientSupplier(localParams.getClientSupplier())
.setDataConnectionRef(localParams.getDataConnectionRef())
.setProcessorSupplier(ProcessorSupplier.of(() -> new ReadMongoP<>(localParams)))
.create());
}
}

Expand Down Expand Up @@ -611,8 +619,18 @@ public StreamSource<T> build() {

ConnectorPermission permission = params.buildPermissions();
return Sources.streamFromProcessorWithWatermarks(name, true,
eventTimePolicy -> ProcessorMetaSupplier.of(permission,
ProcessorSupplier.of(() -> new ReadMongoP<>(localParams.setEventTimePolicy(eventTimePolicy)))));
eventTimePolicy -> new DbCheckingPMetaSupplierBuilder()
.setRequiredPermission(permission)
.setShouldCheck(localParams.isThrowOnNonExisting())
.setForceTotalParallelismOne(false)
.setDatabaseName(localParams.getDatabaseName())
.setCollectionName(localParams.getCollectionName())
.setClientSupplier(localParams.getClientSupplier())
.setDataConnectionRef(localParams.getDataConnectionRef())
.setProcessorSupplier(ProcessorSupplier.of(
() -> new ReadMongoP<>(localParams.setEventTimePolicy(eventTimePolicy))))
.create()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2023 Hazelcast Inc.
*
* Licensed under the Hazelcast Community License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://hazelcast.com/hazelcast-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hazelcast.jet.mongodb.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.dataconnection.DataConnection;
import com.hazelcast.dataconnection.impl.InternalDataConnectionService;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.impl.processor.ExpectNothingP;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.mongodb.dataconnection.MongoDataConnection;
import com.hazelcast.jet.pipeline.DataConnectionRef;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.security.Permission;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Function;

import static com.hazelcast.internal.util.UuidUtil.newUnsecureUuidString;
import static com.hazelcast.jet.datamodel.Tuple2.tuple2;
import static com.hazelcast.jet.impl.util.Util.arrayIndexOf;
import static com.hazelcast.jet.mongodb.impl.MongoUtilities.checkCollectionExists;
import static com.hazelcast.jet.mongodb.impl.MongoUtilities.checkDatabaseExists;
import static com.hazelcast.partition.strategy.StringPartitioningStrategy.getPartitionKey;
import static java.util.Collections.singletonList;

/**
* A {@link ProcessorMetaSupplier} that will check if requested database and collection exist before creating
* the processors.
*/
public class DbCheckingPMetaSupplier implements ProcessorMetaSupplier {

private final Permission requiredPermission;
private final boolean shouldCheck;
private boolean forceTotalParallelismOne;
private final String databaseName;
private final String collectionName;
private final ProcessorSupplier processorSupplier;
private final SupplierEx<? extends MongoClient> clientSupplier;
private final DataConnectionRef dataConnectionRef;
private int preferredLocalParallelism = Vertex.LOCAL_PARALLELISM_USE_DEFAULT;

private transient Address ownerAddress;
k-jamroz marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a new instance of this meta supplier.
*/
public DbCheckingPMetaSupplier(@Nullable Permission requiredPermission,
boolean shouldCheck,
boolean forceTotalParallelismOne,
@Nullable String databaseName,
@Nullable String collectionName,
@Nullable SupplierEx<? extends MongoClient> clientSupplier,
@Nullable DataConnectionRef dataConnectionRef,
@Nonnull ProcessorSupplier processorSupplier) {
this.requiredPermission = requiredPermission;
this.shouldCheck = shouldCheck;
this.forceTotalParallelismOne = forceTotalParallelismOne;
this.databaseName = databaseName;
this.collectionName = collectionName;
this.processorSupplier = processorSupplier;
this.clientSupplier = clientSupplier;
this.dataConnectionRef = dataConnectionRef;
}

/**
* Sets preferred local parallelism. If {@link #forceTotalParallelismOne} is selected, this
* method will have no effect.
*/
public DbCheckingPMetaSupplier withPreferredLocalParallelism(int preferredLocalParallelism) {
k-jamroz marked this conversation as resolved.
Show resolved Hide resolved
this.preferredLocalParallelism = forceTotalParallelismOne ? 1 : preferredLocalParallelism;
k-jamroz marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

@Override
public int preferredLocalParallelism() {
k-jamroz marked this conversation as resolved.
Show resolved Hide resolved
return forceTotalParallelismOne ? 1 : preferredLocalParallelism;
}

@Nullable
@Override
public Permission getRequiredPermission() {
return requiredPermission;
}

/**
* If true, only one instance of given supplier will be created.
*/
public DbCheckingPMetaSupplier forceTotalParallelismOne(boolean forceTotalParallelismOne) {
this.forceTotalParallelismOne = forceTotalParallelismOne;
return this;
}

@Override
public boolean initIsCooperative() {
return true;
k-jamroz marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void init(@Nonnull Context context) throws Exception {
if (forceTotalParallelismOne) {
preferredLocalParallelism = 1;
k-jamroz marked this conversation as resolved.
Show resolved Hide resolved
if (context.localParallelism() != 1) {
throw new IllegalArgumentException(
"Local parallelism of " + context.localParallelism() + " was requested for a vertex that "
+ "supports only total parallelism of 1. Local parallelism must be 1.");
}
String key = getPartitionKey(newUnsecureUuidString());
int partitionId = context.hazelcastInstance().getPartitionService().getPartition(key).getPartitionId();
k-jamroz marked this conversation as resolved.
Show resolved Hide resolved
ownerAddress = context.partitionAssignment().entrySet().stream()
.filter(en -> arrayIndexOf(partitionId, en.getValue()) >= 0)
.findAny()
.map(Entry::getKey)
.orElseThrow(() -> new RuntimeException("Owner partition not assigned to any " +
"participating member"));
}

if (shouldCheck) {
Tuple2<MongoClient, DataConnection> clientAndRef = connect(context);
MongoClient client = clientAndRef.requiredF0();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this client instance closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, 726d39d

try {
if (databaseName != null) {
checkDatabaseExists(client, databaseName);
MongoDatabase database = client.getDatabase(databaseName);
if (collectionName != null) {
checkCollectionExists(database, collectionName);
}
}
} finally {
DataConnection connection = clientAndRef.f1();
if (connection != null) {
connection.release();
}
}
}
}

private Tuple2<MongoClient, DataConnection> connect(Context context) {
try {
if (clientSupplier != null) {
return tuple2(clientSupplier.get(), null);
} else if (dataConnectionRef != null) {
NodeEngineImpl nodeEngine = Util.getNodeEngine(context.hazelcastInstance());
InternalDataConnectionService dataConnectionService = nodeEngine.getDataConnectionService();
k-jamroz marked this conversation as resolved.
Show resolved Hide resolved
var dataConnection = dataConnectionService.getAndRetainDataConnection(dataConnectionRef.getName(),
MongoDataConnection.class);
return tuple2(dataConnection.getClient(), dataConnection);
} else {
throw new IllegalArgumentException("Either connectionSupplier or dataConnectionRef must be provided " +
"if database and collection existence checks are requested");
}
} catch (Exception e) {
throw new JetException("Cannot connect to MongoDB", e);
}
}

@Nonnull
@Override
public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> addresses) {
if (forceTotalParallelismOne) {
return addr -> addr.equals(ownerAddress) ? processorSupplier : count -> singletonList(new ExpectNothingP());
} else {
return addr -> processorSupplier;
}
}

@Override
public boolean closeIsCooperative() {
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2023 Hazelcast Inc.
*
* Licensed under the Hazelcast Community License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://hazelcast.com/hazelcast-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hazelcast.jet.mongodb.impl;

import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.pipeline.DataConnectionRef;
import com.mongodb.client.MongoClient;

import java.security.Permission;

public class DbCheckingPMetaSupplierBuilder {
private Permission requiredPermission;
private boolean shouldCheck;
private boolean forceTotalParallelismOne;
private String databaseName;
private String collectionName;
private SupplierEx<? extends MongoClient> clientSupplier;
private DataConnectionRef dataConnectionRef;
private ProcessorSupplier processorSupplier;

public DbCheckingPMetaSupplierBuilder setRequiredPermission(Permission requiredPermission) {
this.requiredPermission = requiredPermission;
return this;
}

public DbCheckingPMetaSupplierBuilder setShouldCheck(boolean shouldCheck) {
this.shouldCheck = shouldCheck;
return this;
}

public DbCheckingPMetaSupplierBuilder setForceTotalParallelismOne(boolean forceTotalParallelismOne) {
this.forceTotalParallelismOne = forceTotalParallelismOne;
return this;
}

public DbCheckingPMetaSupplierBuilder setDatabaseName(String databaseName) {
k-jamroz marked this conversation as resolved.
Show resolved Hide resolved
this.databaseName = databaseName;
return this;
}

public DbCheckingPMetaSupplierBuilder setCollectionName(String collectionName) {
this.collectionName = collectionName;
return this;
}

public DbCheckingPMetaSupplierBuilder setClientSupplier(SupplierEx<? extends MongoClient> clientSupplier) {
this.clientSupplier = clientSupplier;
return this;
}

public DbCheckingPMetaSupplierBuilder setDataConnectionRef(DataConnectionRef dataConnectionRef) {
this.dataConnectionRef = dataConnectionRef;
return this;
}

public DbCheckingPMetaSupplierBuilder setProcessorSupplier(ProcessorSupplier processorSupplier) {
this.processorSupplier = processorSupplier;
return this;
}

public DbCheckingPMetaSupplier create() {
frant-hartm marked this conversation as resolved.
Show resolved Hide resolved
return new DbCheckingPMetaSupplier(requiredPermission, shouldCheck, forceTotalParallelismOne, databaseName,
collectionName, clientSupplier, dataConnectionRef, processorSupplier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,14 @@ static void checkDatabaseExists(MongoClient client, String databaseName) {
ClusterDescription clusterDescription = client.getClusterDescription();
throw new JetException("Database " + databaseName + " does not exist in cluster " + clusterDescription);
}


public static void checkDatabaseAndCollectionExists(MongoClient client, String databaseName, String collectionName) {
checkDatabaseExists(client, databaseName);
MongoDatabase database = client.getDatabase(databaseName);
if (collectionName != null) {
checkCollectionExists(database, collectionName);
}
}
frant-hartm marked this conversation as resolved.
Show resolved Hide resolved

}