Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give the possibility to control pool size of Mongo Data Connection #25027

Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import com.hazelcast.dataconnection.DataConnectionResource;
import com.hazelcast.jet.impl.util.ConcurrentMemoizingSupplier;
import com.hazelcast.spi.annotation.Beta;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoClientSettings.Builder;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.connection.ConnectionPoolSettings;

import javax.annotation.Nonnull;
import java.util.ArrayList;
Expand All @@ -49,10 +51,12 @@
*/
@Beta
public class MongoDataConnection extends DataConnectionBase {

/**
* Name of a property which holds connection string to the mongodb instance.
*/
public static final String CONNECTION_STRING_PROPERTY = "connectionString";

/**
* Name of a property with a database name hint.
* This is used as a <strong>hint</strong> only; {@link #listResources} will return only collection from db with this
Expand All @@ -65,44 +69,70 @@ public class MongoDataConnection extends DataConnectionBase {
* Name of the property holding username.
*/
public static final String USERNAME_PROPERTY = "username";

/**
* Name of the property holding user password.
*/
public static final String PASSWORD_PROPERTY = "password";

/**
* Name of a property which holds host:port address of the mongodb instance.
*/
public static final String HOST_PROPERTY = "host";

/**
* Name of the property holding the name of the database in which user is created.
* Default value is {@code admin}.
*/
public static final String AUTH_DB_PROPERTY = "authDb";

/**
* Name of the property holding the minimum size of Mongo Client connection pool.
* Default is 10.
* @since 5.4
*/
public static final String CONNECTION_POOL_MIN = "connectionPoolMinSize";

/**
* Name of the property holding the maximum size of Mongo Client connection pool.
* Default is 0, which means unlimited.
* @since 5.4
*/
public static final String CONNECTION_POOL_MAX = "connectionPoolMaxSize";

private volatile ConcurrentMemoizingSupplier<MongoClient> mongoClientSup;
private final String name;
private final String connectionString;
private final String databaseName;
private final String username;
private final String password;
private final String host;
private final String authDb;
private final int connectionPoolMinSize;
private final int connectionPoolMaxSize;

/**
* Creates a new data connection based on given config.
*/
@SuppressWarnings("DataFlowIssue")
public MongoDataConnection(DataConnectionConfig config) {
super(config);
this.name = config.getName();
this.connectionString = config.getProperty(CONNECTION_STRING_PROPERTY);
this.databaseName = config.getProperty(DATABASE_PROPERTY);
this.username = config.getProperty(USERNAME_PROPERTY);
this.password = config.getProperty(PASSWORD_PROPERTY);
this.host = config.getProperty(HOST_PROPERTY);
this.authDb = config.getProperty(AUTH_DB_PROPERTY, "admin");
this.connectionPoolMinSize = Integer.parseInt(config.getProperty(CONNECTION_POOL_MIN, "10"));
this.connectionPoolMaxSize = Integer.parseInt(config.getProperty(CONNECTION_POOL_MAX, "0"));

checkState(allSame((username == null), (password == null), (host == null)),
"You have to provide connectionString property or combination of username, password and host");

if (config.isShared()) {
this.mongoClientSup = new ConcurrentMemoizingSupplier<>(
() -> new CloseableMongoClient(createClient(config), this::release));
() -> new CloseableMongoClient(createClient(), this::release));
}
}

Expand All @@ -119,25 +149,35 @@ private static boolean allSame(boolean... booleans) {
return true;
}

private MongoClient createClient(DataConnectionConfig config) {
@SuppressWarnings("checkstyle:MagicNumber")
private MongoClient createClient() {
try {
String connectionString = config.getProperty(CONNECTION_STRING_PROPERTY);
if (connectionString != null) {
return MongoClients.create(connectionString);
Builder builder = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(connectionString))
.codecRegistry(defaultCodecRegistry())
.applyToConnectionPoolSettings(this::connectionPoolSettings);
return MongoClients.create(builder.build());
}
ServerAddress serverAddress = new ServerAddress(host);
MongoCredential credential = MongoCredential.createCredential(username, authDb, password.toCharArray());
Builder builder = MongoClientSettings.builder()
.codecRegistry(defaultCodecRegistry())
.applyToConnectionPoolSettings(this::connectionPoolSettings)
.applyToClusterSettings(s -> s.hosts(singletonList(serverAddress)))
.credential(credential);
return MongoClients.create(builder.build());
} catch (Exception e) {
throw new HazelcastException("Unable to create Mongo client for data connection '" + config.getName() + "'",
throw new HazelcastException("Unable to create Mongo client for data connection '" + name + "'",
e);
}
}

private void connectionPoolSettings(ConnectionPoolSettings.Builder builder) {
builder.minSize(connectionPoolMinSize)
.maxSize(connectionPoolMaxSize);
}

/**
* Returns an instance of {@link MongoClient}.
*
Expand All @@ -153,7 +193,7 @@ public MongoClient getClient() {
checkState(supplier != null, "Mongo client should not be closed at this point");
return supplier.get();
} else {
MongoClient client = createClient(getConfig());
MongoClient client = createClient();
return new CloseableMongoClient(client, client::close);
}
}
Expand Down