Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,9 +37,11 @@
*/
public class JobHistoryDataSourceProvider extends org.apache.gobblin.util.jdbc.DataSourceProvider {
private static final Logger LOG = LoggerFactory.getLogger(JobHistoryDataSourceProvider.class);
private static final AtomicInteger POOL_NUM = new AtomicInteger(0);

@Inject
public JobHistoryDataSourceProvider(@Named("dataSourceProperties") Properties properties) {
this.dataSource.setPoolName("HikariPool-" + POOL_NUM.incrementAndGet() + "-" + getClass().getSimpleName());
this.dataSource.setDriverClassName(properties.getProperty(ConfigurationKeys.JOB_HISTORY_STORE_JDBC_DRIVER_KEY,
ConfigurationKeys.DEFAULT_JOB_HISTORY_STORE_JDBC_DRIVER));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -36,6 +38,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.sql.DataSource;
Expand Down Expand Up @@ -79,6 +82,7 @@
**/
public class MysqlStateStore<T extends State> implements StateStore<T> {
private static final Logger LOG = LoggerFactory.getLogger(MysqlStateStore.class);
private static final AtomicInteger POOL_NUM = new AtomicInteger(0);

/** Specifies which 'Job State' query columns receive search evaluation (with SQL `LIKE` operator). */
protected enum JobStateSearchColumns {
Expand Down Expand Up @@ -201,6 +205,16 @@ public static DataSource newDataSource(Config config) {
HikariDataSource dataSource = new HikariDataSource();
PasswordManager passwordManager = PasswordManager.getInstance(ConfigUtils.configToProperties(config));

String jdbcUrl = config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY);
String poolName = "HikariPool-" + POOL_NUM.incrementAndGet() + "-" + MysqlStateStore.class.getSimpleName();
try {
String dbPath = new URI(new URI(jdbcUrl).getSchemeSpecificPart()).getPath().replaceAll("\\W", "-");
// when possible, attempt discernment to the DB level
poolName += dbPath; // as the path will begin w/ "/", following `replaceAll`, no need to prepend additional "-"
} catch (URISyntaxException e) {
LOG.warn("unable to parse JDBC URL '{}' - {}", jdbcUrl, e.getMessage());
}
dataSource.setPoolName(poolName);
dataSource.setDriverClassName(ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_JDBC_DRIVER_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER));
// MySQL server can timeout a connection so need to validate connections before use
Expand All @@ -213,7 +227,7 @@ public static DataSource newDataSource(Config config) {
dataSource.setConnectionTestQuery(validationQuery);
dataSource.setAutoCommit(false);
dataSource.setIdleTimeout(Duration.ofSeconds(60).toMillis());
dataSource.setJdbcUrl(config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
dataSource.setJdbcUrl(jdbcUrl);
// TODO: revisit following verification of successful connection pool migration:
// whereas `o.a.commons.dbcp.BasicDataSource` defaults min idle conns to 0, hikari defaults to 10.
// perhaps non-zero would have desirable runtime perf, but anything >0 currently fails unit tests (even 1!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.source.jdbc;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.gobblin.tunnel.Tunnel;

Expand All @@ -30,6 +31,8 @@
* @author nveeramr
*/
public class JdbcProvider extends HikariDataSource {
private static final AtomicInteger POOL_NUM = new AtomicInteger(0);

private Tunnel tunnel;

// If extract type is not provided then consider it as a default type
Expand Down Expand Up @@ -73,6 +76,7 @@ public void connect(String driver, String connectionUrl, String user, String pas
}
}

this.setPoolName("HikariPool-" + POOL_NUM.incrementAndGet() + "-" + getClass().getSimpleName());
this.setDriverClassName(driver);
this.setUsername(user);
this.setPassword(password);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +42,7 @@

public class ServiceDatabaseProviderImpl implements ServiceDatabaseProvider {
private static final Logger LOG = LoggerFactory.getLogger(ServiceDatabaseProviderImpl.class);
private static final AtomicInteger POOL_NUM = new AtomicInteger(0);

private final Configuration configuration;
private HikariDataSource dataSource;
Expand All @@ -62,6 +64,7 @@ private synchronized void ensureDataSource() {

dataSource = new HikariDataSource();

dataSource.setPoolName("HikariPool-" + POOL_NUM.incrementAndGet() + "-" + getClass().getSimpleName());
dataSource.setJdbcUrl(configuration.getUrl());
dataSource.setUsername(configuration.getUserName());
dataSource.setPassword(configuration.getPassword());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import javax.sql.DataSource;

Expand All @@ -39,6 +40,7 @@
*/
public class DataSourceProvider implements Provider<DataSource> {
private static final Logger LOG = LoggerFactory.getLogger(DataSourceProvider.class);
private static final AtomicInteger POOL_NUM = new AtomicInteger(0);

public static final String GOBBLIN_UTIL_JDBC_PREFIX = "gobblin.util.jdbc.";
public static final String CONN_DRIVER = GOBBLIN_UTIL_JDBC_PREFIX + "conn.driver";
Expand All @@ -55,6 +57,7 @@ public class DataSourceProvider implements Provider<DataSource> {
@Inject
public DataSourceProvider(@Named("dataSourceProperties") Properties properties) {
this.dataSource = new HikariDataSource();
this.dataSource.setPoolName("HikariPool-" + POOL_NUM.incrementAndGet() + "-" + getClass().getSimpleName());
this.dataSource.setDriverClassName(properties.getProperty(CONN_DRIVER, DEFAULT_CONN_DRIVER));
// the validation query should work beyond mysql; still, to bypass for any reason, heed directive
if (!Boolean.parseBoolean(properties.getProperty(SKIP_VALIDATION_QUERY, "false"))) {
Expand Down