Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADBDEV-4980: Implement reload and reloadAll methods for profiles #76

Merged
merged 4 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ public void setRequestContext(RequestContext context) {
public void afterPropertiesSet() {
}

@Override
public void reloadAll() {
}

@Override
public void reload(String server) {
}

/**
* When DEBUG mode is enabled, logs the total number of rows read, the
* amount of time it took to read the file, and the average read speed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ public interface Plugin {
* Invoked after the {@code RequestContext} has been bound
*/
void afterPropertiesSet();

void reloadAll();

void reload(String server);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.arenadata.security.encryption.client.service.DecryptClient;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.greenplum.pxf.api.error.PxfRuntimeException;
import org.greenplum.pxf.api.model.BasePlugin;
import org.greenplum.pxf.api.model.RequestContext;
import org.greenplum.pxf.api.security.SecureLogin;
Expand All @@ -40,12 +41,8 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import java.util.stream.Collectors;
import java.util.Objects;

import static org.greenplum.pxf.api.security.SecureLogin.CONFIG_KEY_SERVICE_USER_IMPERSONATION;

Expand Down Expand Up @@ -100,6 +97,7 @@ public class JdbcBasePlugin extends BasePlugin {
private static final String MYSQL_DRIVER_PREFIX = "com.mysql.";
private static final String JDBC_DATE_WIDE_RANGE = "jdbc.date.wideRange";
private static final String JDBC_DATE_WIDE_RANGE_LEGACY = "jdbc.date.wide-range";

private enum TransactionIsolation {
READ_UNCOMMITTED(1),
READ_COMMITTED(2),
Expand Down Expand Up @@ -493,6 +491,26 @@ public static void closeStatementAndConnection(Statement statement) throws SQLEx
}
}

@Override
public void reloadAll() {
if (Objects.nonNull(connectionManager)) {
connectionManager.reloadCache(poolDescriptor -> true);
} else {
throw new PxfRuntimeException("Failed to reload profile. Connection manager is null.");
}
}

@Override
public void reload(String server) {
if (Objects.isNull(connectionManager)) {
throw new PxfRuntimeException("Failed to reload profile. Connection manager is null.");
} else if (StringUtils.isBlank(server)) {
throw new PxfRuntimeException("Failed to reload profile. Parameter server is blank.");
} else {
connectionManager.reloadCache(poolDescriptor -> poolDescriptor.getServer().equals(server));
iamlapa marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* For a Kerberized Hive JDBC connection, it creates a connection as the loginUser.
* Otherwise, it returns a new connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.greenplum.pxf.plugins.jdbc.PxfJdbcProperties;
import org.slf4j.Logger;
Expand All @@ -24,14 +25,18 @@
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Responsible for obtaining and maintaining JDBC connections to databases. If configured for a given server,
* uses Hikari Connection Pool to pool database connections.
*/
@Slf4j
@Component
public class ConnectionManager {

Expand Down Expand Up @@ -78,6 +83,17 @@ public ConnectionManager(DataSourceFactory factory, Ticker ticker, PxfJdbcProper
.build(CacheLoader.from(factory::createDataSource));
}

public void reloadCache(Predicate<PoolDescriptor> poolDescriptorFilter) {
Set<PoolDescriptor> poolDescriptorSet = dataSources.asMap().keySet().stream()
.filter(poolDescriptorFilter)
.collect(Collectors.toSet());
poolDescriptorSet.forEach(pd -> {
SidorovGreg marked this conversation as resolved.
Show resolved Hide resolved
log.info("Invalidate cache for pool descriptor {}", pd);
dataSources.invalidate(pd);
});
cleanCache();
}

/**
* Explicitly runs cache maintenance operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public int hashCode() {
@Override
public String toString() {
return "PoolDescriptor{" +
"jdbcUrl=" + jdbcUrl +
"server=" + server +
", jdbcUrl=" + jdbcUrl +
", user=" + user +
", password=" + ConnectionManager.maskPassword(password) +
", connectionConfig=" + connectionConfig +
Expand Down