Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hive.metastore.datasource;

import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;

import javax.sql.DataSource;
Expand Down Expand Up @@ -112,9 +113,22 @@ public DataSource create(Configuration hdpConfig, int maxPoolSize) throws SQLExc
objectPool.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
objectPool.setLifo(lifo);

// Enable TxnHandler#connPoolMutex to release the idle connection if possible,
// TxnHandler#connPoolMutex is mostly used for MutexAPI that is primarily designed to
// provide coarse-grained mutex support to maintenance tasks running inside the Metastore,
// this will make Metastore more scalable especially if there is a leader in the warehouse.
if ("mutex".equalsIgnoreCase(poolName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding a comment to explain why we are limiting database connections related to mutexes, similar to the comment you already put in HikariCPDataSourceProvider.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (timeBetweenEvictionRuns < 0) {
// When timeBetweenEvictionRunsMillis non-positive, no idle object evictor thread runs
objectPool.setTimeBetweenEvictionRunsMillis(30 * 1000);
}
if (softMinEvictableIdleTimeMillis < 0) {
objectPool.setSoftMinEvictableIdleTimeMillis(600 * 1000);
}
}
String stmt = dbProduct.getPrepareTxnStmt();
if (stmt != null) {
poolableConnFactory.setValidationQuery(stmt);
poolableConnFactory.setConnectionInitSql(Collections.singletonList(stmt));
}
return new PoolingDataSource(objectPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ public DataSource create(Configuration hdpConfig, int maxPoolSize) throws SQLExc
config.setPoolName(poolName);
}

// It's kind of a waste to create a fixed size connection pool as same as the TxnHandler#connPool,
// TxnHandler#connPoolMutex is mostly used for MutexAPI that is primarily designed to
// provide coarse-grained mutex support to maintenance tasks running inside the Metastore,
// add minimumIdle=2 and idleTimeout=10min(default, can be set by hikaricp.idleTimeout) to the pool,
// so that the connection pool can retire the idle connection aggressively,
// this will make Metastore more scalable especially if there is a leader in the warehouse.
if ("mutex".equals(poolName)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we please extract the string literal into constants?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This literal is used only in creating data source, so in my point of view we can use it directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"mutex" is repeated in a few places, and correctness of this logic depends on the DataSourceProvider code being in sync with the pool name chosen in TxnHandler. I guess ideally, all of the data source name strings should be moved to shared constants somewhere, maybe as a separate refactoring patch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

int minimumIdle = Integer.valueOf(hdpConfig.get(HIKARI + ".minimumIdle", "2"));
config.setMinimumIdle(Math.min(maxPoolSize, minimumIdle));
}

//https://github.com/brettwooldridge/HikariCP
config.setConnectionTimeout(connectionTimeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6188,6 +6188,12 @@ public Connection getConnection(String username, String password) throws SQLExce
connectionProps.setProperty("user", username);
connectionProps.setProperty("password", password);
Connection conn = driver.connect(connString, connectionProps);
String prepareStmt = dbProduct != null ? dbProduct.getPrepareTxnStmt() : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this required, I can see the same in DbCPDataSourceProvider:

    String stmt = dbProduct.getPrepareTxnStmt();
    if (stmt != null) {
      poolableConnFactory.setValidationQuery(stmt);
    }

Copy link
Member Author

@dengzhhu653 dengzhhu653 Jan 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used for mysql treating " as an identifier quote character: SET @@session.sql_mode=ANSI_QUOTES,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (prepareStmt != null) {
try (Statement stmt = conn.createStatement()) {
stmt.execute(prepareStmt);
}
}
conn.setAutoCommit(false);
return conn;
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

import com.zaxxer.hikari.HikariDataSource;

import com.zaxxer.hikari.HikariPoolMXBean;
import org.apache.commons.dbcp2.PoolingDataSource;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.PersistenceManagerProvider;
import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
Expand All @@ -34,6 +37,8 @@
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

@Category(MetastoreUnitTest.class)
public class TestDataSourceProviderFactory {
Expand Down Expand Up @@ -143,6 +148,60 @@ public void testCreateDbCpDataSource() throws SQLException {
Assert.assertTrue(ds instanceof PoolingDataSource);
}

@Test
public void testEvictIdleConnection() throws Exception {
String[] dataSourceType = {HikariCPDataSourceProvider.HIKARI, DbCPDataSourceProvider.DBCP};
try (DataSourceProvider.DataSourceNameConfigurator configurator =
new DataSourceProvider.DataSourceNameConfigurator(conf, "mutex")) {
for (final String type: dataSourceType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a test fails, it will be hard to tell if it failed while testing Hikari vs. DbCP. I suggest using customized assert messages that include dataSourceType.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, type);
boolean isHikari = HikariCPDataSourceProvider.HIKARI.equals(type);
if (isHikari) {
conf.unset("hikaricp.connectionInitSql");
// The minimum of idleTimeout is 10s
conf.set("hikaricp.idleTimeout", "10000");
System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "1000");
} else {
conf.set("dbcp.timeBetweenEvictionRunsMillis", "1000");
conf.set("dbcp.softMinEvictableIdleTimeMillis", "3000");
conf.set("dbcp.maxIdle", "0");
}
DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
DataSource ds = dsp.create(conf, 5);
List<Connection> connections = new ArrayList<>();
for (int i = 0; i < 5; i++) {
connections.add(ds.getConnection());
}
HikariPoolMXBean poolMXBean = null;
GenericObjectPool objectPool = null;
if (isHikari) {
poolMXBean = ((HikariDataSource) ds).getHikariPoolMXBean();
Assert.assertEquals(type, 5, poolMXBean.getTotalConnections());
Assert.assertEquals(type, 5, poolMXBean.getActiveConnections());
} else {
objectPool = (GenericObjectPool) MethodUtils.invokeMethod(ds, true, "getPool");
Assert.assertEquals(type, 5, objectPool.getNumActive());
Assert.assertEquals(type, 5, objectPool.getMaxTotal());
}
connections.forEach(connection -> {
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
Thread.sleep(isHikari ? 15000 : 7000);
if (isHikari) {
Assert.assertEquals(type, 2, poolMXBean.getTotalConnections());
Assert.assertEquals(type, 2, poolMXBean.getIdleConnections());
} else {
Assert.assertEquals(type, 0, objectPool.getNumActive());
Assert.assertEquals(type, 0, objectPool.getNumIdle());
}
}
}
}

@Test
public void testClosePersistenceManagerProvider() throws Exception {
String[] dataSourceType = {HikariCPDataSourceProvider.HIKARI, DbCPDataSourceProvider.DBCP};
Expand Down