Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give the possibility to control pool size of Mongo Data Connection #25027

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import com.hazelcast.dataconnection.DataConnectionResource;
import com.hazelcast.jet.impl.util.ConcurrentMemoizingSupplier;
import com.hazelcast.spi.annotation.Beta;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoClientSettings.Builder;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.connection.ConnectionPoolSettings;

import javax.annotation.Nonnull;
import java.util.ArrayList;
Expand All @@ -49,10 +51,12 @@
*/
@Beta
public class MongoDataConnection extends DataConnectionBase {

/**
* Name of a property which holds connection string to the mongodb instance.
*/
public static final String CONNECTION_STRING_PROPERTY = "connectionString";

/**
* Name of a property with a database name hint.
* This is used as a <strong>hint</strong> only; {@link #listResources} will return only collection from db with this
Expand All @@ -65,44 +69,73 @@ public class MongoDataConnection extends DataConnectionBase {
* Name of the property holding username.
*/
public static final String USERNAME_PROPERTY = "username";

/**
* Name of the property holding user password.
*/
public static final String PASSWORD_PROPERTY = "password";

/**
* Name of a property which holds host:port address of the mongodb instance.
*/
public static final String HOST_PROPERTY = "host";

/**
* Name of the property holding the name of the database in which user is created.
* Default value is {@code admin}.
*/
public static final String AUTH_DB_PROPERTY = "authDb";

/**
* Name of the property holding the minimum size of Mongo Client connection pool.
* Default is 10.
* @since 5.4
*/
public static final String CONNECTION_POOL_MIN = "connectionPoolMinSize";

/**
* Name of the property holding the maximum size of Mongo Client connection pool.
* Default is 10.
* @since 5.4
*/
public static final String CONNECTION_POOL_MAX = "connectionPoolMaxSize";

private volatile ConcurrentMemoizingSupplier<MongoClient> mongoClientSup;
private final String name;
private final String connectionString;
private final String databaseName;
private final String username;
private final String password;
private final String host;
private final String authDb;
private final int connectionPoolMinSize;
private final int connectionPoolMaxSize;

/**
* Creates a new data connection based on given config.
*/
@SuppressWarnings("DataFlowIssue")
public MongoDataConnection(DataConnectionConfig config) {
super(config);
this.name = config.getName();
this.connectionString = config.getProperty(CONNECTION_STRING_PROPERTY);
this.databaseName = config.getProperty(DATABASE_PROPERTY);
this.username = config.getProperty(USERNAME_PROPERTY);
this.password = config.getProperty(PASSWORD_PROPERTY);
this.host = config.getProperty(HOST_PROPERTY);
this.authDb = config.getProperty(AUTH_DB_PROPERTY, "admin");
this.connectionPoolMinSize = Integer.parseInt(config.getProperty(CONNECTION_POOL_MIN, "10"));
this.connectionPoolMaxSize = Integer.parseInt(config.getProperty(CONNECTION_POOL_MAX, "10"));

checkState(connectionPoolMinSize <= connectionPoolMaxSize, "connection pool max size" +
" cannot be lower than min size");

checkState(allSame((username == null), (password == null), (host == null)),
"You have to provide connectionString property or combination of username, password and host");

if (config.isShared()) {
this.mongoClientSup = new ConcurrentMemoizingSupplier<>(
() -> new CloseableMongoClient(createClient(config), this::release));
() -> new CloseableMongoClient(createClient(), this::release));
}
}

Expand All @@ -119,25 +152,34 @@ private static boolean allSame(boolean... booleans) {
return true;
}

private MongoClient createClient(DataConnectionConfig config) {
private MongoClient createClient() {
try {
String connectionString = config.getProperty(CONNECTION_STRING_PROPERTY);
if (connectionString != null) {
return MongoClients.create(connectionString);
Builder builder = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(connectionString))
.codecRegistry(defaultCodecRegistry())
.applyToConnectionPoolSettings(this::connectionPoolSettings);
return MongoClients.create(builder.build());
}
ServerAddress serverAddress = new ServerAddress(host);
MongoCredential credential = MongoCredential.createCredential(username, authDb, password.toCharArray());
Builder builder = MongoClientSettings.builder()
.codecRegistry(defaultCodecRegistry())
.applyToConnectionPoolSettings(this::connectionPoolSettings)
.applyToClusterSettings(s -> s.hosts(singletonList(serverAddress)))
.credential(credential);
return MongoClients.create(builder.build());
} catch (Exception e) {
throw new HazelcastException("Unable to create Mongo client for data connection '" + config.getName() + "'",
e);
throw new HazelcastException("Unable to create Mongo client for data connection '" + name + "'"
+ e.getMessage(), e);
}
}

private void connectionPoolSettings(ConnectionPoolSettings.Builder builder) {
builder.minSize(connectionPoolMinSize)
.maxSize(connectionPoolMaxSize);
}

/**
* Returns an instance of {@link MongoClient}.
*
Expand All @@ -153,7 +195,7 @@ public MongoClient getClient() {
checkState(supplier != null, "Mongo client should not be closed at this point");
return supplier.get();
} else {
MongoClient client = createClient(getConfig());
MongoClient client = createClient();
return new CloseableMongoClient(client, client::close);
}
}
Expand Down Expand Up @@ -223,7 +265,7 @@ public static DataConnectionConfig mongoDataConnectionConf(String name, String c
dataConnectionConfig.setName(name);
dataConnectionConfig.setShared(true);
dataConnectionConfig.setProperty(CONNECTION_STRING_PROPERTY, connectionString);
dataConnectionConfig.setType("MongoDB");
dataConnectionConfig.setType("Mongo");
return dataConnectionConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hazelcast.test.annotation.QuickTest;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.internal.MongoClientImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -144,6 +145,21 @@ public void should_close_client_when_all_released_when_not_shared() {
assertClosed(client2);
}

@Test
public void should_configure_pool_size() {
dataConnection = new MongoDataConnection(mongoDataConnectionConf("mongo", connectionString)
.setShared(false)
.setProperty(MongoDataConnection.CONNECTION_POOL_MIN, "1337")
.setProperty(MongoDataConnection.CONNECTION_POOL_MAX, "2023"));

try (MongoClient client = dataConnection.getClient()) {
var asCloseable = (CloseableMongoClient) client;
var impl = (MongoClientImpl) asCloseable.delegate;
assertThat(impl.getSettings().getConnectionPoolSettings().getMinSize()).isEqualTo(1337);
assertThat(impl.getSettings().getConnectionPoolSettings().getMaxSize()).isEqualTo(2023);
}
}

private void assertNotClosed(MongoClient client) {
client.listDatabases().into(new ArrayList<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.hazelcast.sql.HazelcastSqlException;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.hazelcast.test.starter.ReflectionUtils;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.junit.Test;
Expand Down Expand Up @@ -54,6 +56,32 @@ public void createsConnection() {
}
}

@Test
public void createsConnectionWithPoolSize() throws IllegalAccessException {
String dlName = randomName();
String dbName = randomName();
String colName = randomName();
try (MongoClient client = MongoClients.create(connectionString)) {
client.getDatabase(dbName).createCollection(colName);
}
String options = String.format("OPTIONS ('connectionString' = '%s', 'idColumn' = 'id', " +
"'connectionPoolMinSize' = '1337', 'connectionPoolMaxSize' = '2023') ",
connectionString);
instance().getSql().executeUpdate("CREATE DATA CONNECTION " + dlName + " TYPE Mongo SHARED " + options);

MongoDataConnection dataConnection = getNodeEngineImpl(
instance()).getDataConnectionService().getAndRetainDataConnection(dlName, MongoDataConnection.class);

assertThat(dataConnection).isNotNull();

try (MongoClient client = dataConnection.getClient()) {
var impl = ReflectionUtils.getFieldValueReflectively(client, "delegate");
var settings = (MongoClientSettings) ReflectionUtils.getFieldValueReflectively(impl, "settings");
assertThat(settings.getConnectionPoolSettings().getMinSize()).isEqualTo(1337);
assertThat(settings.getConnectionPoolSettings().getMaxSize()).isEqualTo(2023);
}
}

@Test
public void createsConnectionEvenWhenUnreachable_shared() {
testCreatesConnectionEvenWhenUnreachable(true);
Expand Down