Skip to content

Commit

Permalink
Merge pull request #76 from arenadata/feature/ADBDEV-4980
Browse files Browse the repository at this point in the history
ADBDEV-4980: Implement reload and reloadAll methods for profiles
  • Loading branch information
iamlapa committed Feb 28, 2024
2 parents 07062d4 + 424d2d8 commit 154bd15
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ public void setRequestContext(RequestContext context) {
public void afterPropertiesSet() {
}

@Override
public void reloadAll() {
}

@Override
public void reload(String server) {
}

/**
* When DEBUG mode is enabled, logs the total number of rows read, the
* amount of time it took to read the file, and the average read speed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ public interface Plugin {
* Invoked after the {@code RequestContext} has been bound
*/
void afterPropertiesSet();

void reloadAll();

void reload(String server);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.arenadata.security.encryption.client.service.DecryptClient;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.greenplum.pxf.api.error.PxfRuntimeException;
import org.greenplum.pxf.api.model.BasePlugin;
import org.greenplum.pxf.api.model.RequestContext;
import org.greenplum.pxf.api.security.SecureLogin;
Expand All @@ -43,9 +44,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.Objects;

import static org.greenplum.pxf.api.security.SecureLogin.CONFIG_KEY_SERVICE_USER_IMPERSONATION;

Expand Down Expand Up @@ -100,6 +101,7 @@ public class JdbcBasePlugin extends BasePlugin {
private static final String MYSQL_DRIVER_PREFIX = "com.mysql.";
private static final String JDBC_DATE_WIDE_RANGE = "jdbc.date.wideRange";
private static final String JDBC_DATE_WIDE_RANGE_LEGACY = "jdbc.date.wide-range";

private enum TransactionIsolation {
READ_UNCOMMITTED(1),
READ_COMMITTED(2),
Expand Down Expand Up @@ -493,6 +495,26 @@ public static void closeStatementAndConnection(Statement statement) throws SQLEx
}
}

@Override
public void reloadAll() {
if (Objects.nonNull(connectionManager)) {
connectionManager.reloadCache();
} else {
throw new PxfRuntimeException("Failed to reload profile. Connection manager is null.");
}
}

@Override
public void reload(String server) {
if (Objects.isNull(connectionManager)) {
throw new PxfRuntimeException("Failed to reload profile. Connection manager is null.");
} else if (StringUtils.isBlank(server)) {
throw new PxfRuntimeException("Failed to reload profile. Parameter server is blank.");
} else {
connectionManager.reloadCacheIf(poolDescriptor -> poolDescriptor.getServer().equals(server));
}
}

/**
* For a Kerberized Hive JDBC connection, it creates a connection as the loginUser.
* Otherwise, it returns a new connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.greenplum.pxf.plugins.jdbc.PxfJdbcProperties;
import org.slf4j.Logger;
Expand All @@ -27,11 +28,14 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Responsible for obtaining and maintaining JDBC connections to databases. If configured for a given server,
* uses Hikari Connection Pool to pool database connections.
*/
@Slf4j
@Component
public class ConnectionManager {

Expand Down Expand Up @@ -78,6 +82,23 @@ public ConnectionManager(DataSourceFactory factory, Ticker ticker, PxfJdbcProper
.build(CacheLoader.from(factory::createDataSource));
}

public void reloadCache() {
log.info("Invalidate cache of all pool descriptors");
dataSources.invalidateAll();
cleanCache();
}

public void reloadCacheIf(Predicate<PoolDescriptor> poolDescriptorFilter) {
dataSources.asMap().keySet().stream()
.filter(poolDescriptorFilter)
.collect(Collectors.toSet())
.forEach(poolDescriptor -> {
log.info("Invalidate cache of the pool descriptor {}", poolDescriptor);
dataSources.invalidate(poolDescriptor);
});
cleanCache();
}

/**
* Explicitly runs cache maintenance operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public int hashCode() {
@Override
public String toString() {
return "PoolDescriptor{" +
"jdbcUrl=" + jdbcUrl +
"server=" + server +
", jdbcUrl=" + jdbcUrl +
", user=" + user +
", password=" + ConnectionManager.maskPassword(password) +
", connectionConfig=" + connectionConfig +
Expand Down

0 comments on commit 154bd15

Please sign in to comment.