Skip to content

Commit

Permalink
Give the possibility to control pool size of Mongo Data Connection (#…
Browse files Browse the repository at this point in the history
…25027)

By default it's min=zero, which is suboptimal for point-like queries
(e.g. select by key). It takes more time to set up a connection than to
perform actual query.
  • Loading branch information
TomaszGaweda committed Aug 17, 2023
1 parent 84796da commit cbaf65d
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import com.hazelcast.dataconnection.DataConnectionResource;
import com.hazelcast.jet.impl.util.ConcurrentMemoizingSupplier;
import com.hazelcast.spi.annotation.Beta;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoClientSettings.Builder;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.connection.ConnectionPoolSettings;

import javax.annotation.Nonnull;
import java.util.ArrayList;
Expand All @@ -49,10 +51,12 @@
*/
@Beta
public class MongoDataConnection extends DataConnectionBase {

/**
* Name of a property which holds connection string to the mongodb instance.
*/
public static final String CONNECTION_STRING_PROPERTY = "connectionString";

/**
* Name of a property with a database name hint.
* This is used as a <strong>hint</strong> only; {@link #listResources} will return only collection from db with this
Expand All @@ -65,44 +69,73 @@ public class MongoDataConnection extends DataConnectionBase {
* Name of the property holding username.
*/
public static final String USERNAME_PROPERTY = "username";

/**
* Name of the property holding user password.
*/
public static final String PASSWORD_PROPERTY = "password";

/**
* Name of a property which holds host:port address of the mongodb instance.
*/
public static final String HOST_PROPERTY = "host";

/**
* Name of the property holding the name of the database in which user is created.
* Default value is {@code admin}.
*/
public static final String AUTH_DB_PROPERTY = "authDb";

/**
* Name of the property holding the minimum size of Mongo Client connection pool.
* Default is 10.
* @since 5.4
*/
public static final String CONNECTION_POOL_MIN = "connectionPoolMinSize";

/**
* Name of the property holding the maximum size of Mongo Client connection pool.
* Default is 10.
* @since 5.4
*/
public static final String CONNECTION_POOL_MAX = "connectionPoolMaxSize";

private volatile ConcurrentMemoizingSupplier<MongoClient> mongoClientSup;
private final String name;
private final String connectionString;
private final String databaseName;
private final String username;
private final String password;
private final String host;
private final String authDb;
private final int connectionPoolMinSize;
private final int connectionPoolMaxSize;

/**
* Creates a new data connection based on given config.
*/
@SuppressWarnings("DataFlowIssue")
public MongoDataConnection(DataConnectionConfig config) {
super(config);
this.name = config.getName();
this.connectionString = config.getProperty(CONNECTION_STRING_PROPERTY);
this.databaseName = config.getProperty(DATABASE_PROPERTY);
this.username = config.getProperty(USERNAME_PROPERTY);
this.password = config.getProperty(PASSWORD_PROPERTY);
this.host = config.getProperty(HOST_PROPERTY);
this.authDb = config.getProperty(AUTH_DB_PROPERTY, "admin");
this.connectionPoolMinSize = Integer.parseInt(config.getProperty(CONNECTION_POOL_MIN, "10"));
this.connectionPoolMaxSize = Integer.parseInt(config.getProperty(CONNECTION_POOL_MAX, "10"));

checkState(connectionPoolMinSize <= connectionPoolMaxSize, "connection pool max size" +
" cannot be lower than min size");

checkState(allSame((username == null), (password == null), (host == null)),
"You have to provide connectionString property or combination of username, password and host");

if (config.isShared()) {
this.mongoClientSup = new ConcurrentMemoizingSupplier<>(
() -> new CloseableMongoClient(createClient(config), this::release));
() -> new CloseableMongoClient(createClient(), this::release));
}
}

Expand All @@ -119,25 +152,34 @@ private static boolean allSame(boolean... booleans) {
return true;
}

private MongoClient createClient(DataConnectionConfig config) {
private MongoClient createClient() {
try {
String connectionString = config.getProperty(CONNECTION_STRING_PROPERTY);
if (connectionString != null) {
return MongoClients.create(connectionString);
Builder builder = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(connectionString))
.codecRegistry(defaultCodecRegistry())
.applyToConnectionPoolSettings(this::connectionPoolSettings);
return MongoClients.create(builder.build());
}
ServerAddress serverAddress = new ServerAddress(host);
MongoCredential credential = MongoCredential.createCredential(username, authDb, password.toCharArray());
Builder builder = MongoClientSettings.builder()
.codecRegistry(defaultCodecRegistry())
.applyToConnectionPoolSettings(this::connectionPoolSettings)
.applyToClusterSettings(s -> s.hosts(singletonList(serverAddress)))
.credential(credential);
return MongoClients.create(builder.build());
} catch (Exception e) {
throw new HazelcastException("Unable to create Mongo client for data connection '" + config.getName() + "'",
e);
throw new HazelcastException("Unable to create Mongo client for data connection '" + name + "'"
+ e.getMessage(), e);
}
}

private void connectionPoolSettings(ConnectionPoolSettings.Builder builder) {
builder.minSize(connectionPoolMinSize)
.maxSize(connectionPoolMaxSize);
}

/**
* Returns an instance of {@link MongoClient}.
*
Expand All @@ -153,7 +195,7 @@ public MongoClient getClient() {
checkState(supplier != null, "Mongo client should not be closed at this point");
return supplier.get();
} else {
MongoClient client = createClient(getConfig());
MongoClient client = createClient();
return new CloseableMongoClient(client, client::close);
}
}
Expand Down Expand Up @@ -223,7 +265,7 @@ public static DataConnectionConfig mongoDataConnectionConf(String name, String c
dataConnectionConfig.setName(name);
dataConnectionConfig.setShared(true);
dataConnectionConfig.setProperty(CONNECTION_STRING_PROPERTY, connectionString);
dataConnectionConfig.setType("MongoDB");
dataConnectionConfig.setType("Mongo");
return dataConnectionConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hazelcast.test.annotation.QuickTest;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.internal.MongoClientImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -144,6 +145,21 @@ public void should_close_client_when_all_released_when_not_shared() {
assertClosed(client2);
}

@Test
public void should_configure_pool_size() {
dataConnection = new MongoDataConnection(mongoDataConnectionConf("mongo", connectionString)
.setShared(false)
.setProperty(MongoDataConnection.CONNECTION_POOL_MIN, "1337")
.setProperty(MongoDataConnection.CONNECTION_POOL_MAX, "2023"));

try (MongoClient client = dataConnection.getClient()) {
var asCloseable = (CloseableMongoClient) client;
var impl = (MongoClientImpl) asCloseable.delegate;
assertThat(impl.getSettings().getConnectionPoolSettings().getMinSize()).isEqualTo(1337);
assertThat(impl.getSettings().getConnectionPoolSettings().getMaxSize()).isEqualTo(2023);
}
}

private void assertNotClosed(MongoClient client) {
client.listDatabases().into(new ArrayList<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.hazelcast.sql.HazelcastSqlException;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.hazelcast.test.starter.ReflectionUtils;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.junit.Test;
Expand Down Expand Up @@ -54,6 +56,32 @@ public void createsConnection() {
}
}

@Test
public void createsConnectionWithPoolSize() throws IllegalAccessException {
String dlName = randomName();
String dbName = randomName();
String colName = randomName();
try (MongoClient client = MongoClients.create(connectionString)) {
client.getDatabase(dbName).createCollection(colName);
}
String options = String.format("OPTIONS ('connectionString' = '%s', 'idColumn' = 'id', " +
"'connectionPoolMinSize' = '1337', 'connectionPoolMaxSize' = '2023') ",
connectionString);
instance().getSql().executeUpdate("CREATE DATA CONNECTION " + dlName + " TYPE Mongo SHARED " + options);

MongoDataConnection dataConnection = getNodeEngineImpl(
instance()).getDataConnectionService().getAndRetainDataConnection(dlName, MongoDataConnection.class);

assertThat(dataConnection).isNotNull();

try (MongoClient client = dataConnection.getClient()) {
var impl = ReflectionUtils.getFieldValueReflectively(client, "delegate");
var settings = (MongoClientSettings) ReflectionUtils.getFieldValueReflectively(impl, "settings");
assertThat(settings.getConnectionPoolSettings().getMinSize()).isEqualTo(1337);
assertThat(settings.getConnectionPoolSettings().getMaxSize()).isEqualTo(2023);
}
}

@Test
public void createsConnectionEvenWhenUnreachable_shared() {
testCreatesConnectionEvenWhenUnreachable(true);
Expand Down

0 comments on commit cbaf65d

Please sign in to comment.