Skip to content

Commit

Permalink
HIVE-27440: Improve data connector cache (apache#4418) (Butao Zhang, …
Browse files Browse the repository at this point in the history
…Reviewed by Akshat Mathur, Naveen Gangam, Sai Hemanth Gantasala)
  • Loading branch information
zhangbutao committed Jul 13, 2023
1 parent 9da7488 commit f61efe6
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
Expand Up @@ -1941,6 +1941,7 @@ public void alter_dataconnector(final String dcName, final DataConnector newDC)

ms.openTransaction();
ms.alterDataConnector(dcName, newDC);
DataConnectorProviderFactory.invalidateDataConnectorFromCache(dcName);

/*
if (!transactionalListeners.isEmpty()) {
Expand Down Expand Up @@ -1999,6 +2000,7 @@ public void drop_dataconnector(final String dcName, boolean ifNotExists, boolean
RawStore ms = getMS();
try {
connector = getMS().getDataConnector(dcName);
DataConnectorProviderFactory.invalidateDataConnectorFromCache(dcName);
} catch (NoSuchObjectException e) {
if (!ifNotExists) {
throw new NoSuchObjectException("DataConnector " + dcName + " doesn't exist");
Expand Down
Expand Up @@ -18,29 +18,53 @@

package org.apache.hadoop.hive.metastore.dataconnector;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.api.DataConnector;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.DatabaseType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.*;
import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.DERBY_TYPE;
import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.MSSQL_TYPE;
import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.MYSQL_TYPE;
import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.ORACLE_TYPE;
import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.POSTGRES_TYPE;

public class DataConnectorProviderFactory {
Logger LOG = LoggerFactory.getLogger(DataConnectorProviderFactory.class);
static final Logger LOG = LoggerFactory.getLogger(DataConnectorProviderFactory.class);

private static Map<String, IDataConnectorProvider> cache = null;
private static Cache<String, IDataConnectorProvider> dataConnectorCache = null;
private static DataConnectorProviderFactory singleton = null;
private static IHMSHandler hmsHandler = null;

private static class CacheRemoveListener implements RemovalListener<String, IDataConnectorProvider> {
@Override
public void onRemoval(@Nullable String dcName, @Nullable IDataConnectorProvider dataConnectorProvider,
@NonNull RemovalCause cause) {
try {
LOG.info("Closing dataConnectorProvider :{}", dcName);
dataConnectorProvider.close();
} catch (Exception e) {
LOG.warn("Exception when closing dataConnectorProvider: {} due to: {}" + dcName, e.getMessage());
}
}
}

private DataConnectorProviderFactory(IHMSHandler hmsHandler) {
cache = new HashMap<String, IDataConnectorProvider>();
dataConnectorCache = Caffeine.newBuilder()
.removalListener(new CacheRemoveListener())
.maximumSize(100)
.expireAfterAccess(1, TimeUnit.HOURS).build();
this.hmsHandler = hmsHandler;
}

Expand All @@ -59,11 +83,9 @@ public static synchronized IDataConnectorProvider getDataConnectorProvider(Datab
}

String scopedDb = (db.getRemote_dbname() != null) ? db.getRemote_dbname() : db.getName();
if (cache.containsKey(db.getConnector_name().toLowerCase())) {
provider = cache.get(db.getConnector_name().toLowerCase());
if (provider != null) {
provider.setScope(scopedDb);
}
provider = dataConnectorCache.getIfPresent(db.getConnector_name().toLowerCase());
if (provider != null) {
provider.setScope(scopedDb);
return provider;
}

Expand All @@ -89,19 +111,23 @@ public static synchronized IDataConnectorProvider getDataConnectorProvider(Datab
default:
throw new MetaException("Data connector of type " + connector.getType() + " not implemented yet");
}
cache.put(connector.getName().toLowerCase(), provider);
dataConnectorCache.put(connector.getName().toLowerCase(), provider);
return provider;
}

public void shutdown() {
for (IDataConnectorProvider provider: cache.values()) {
try {
provider.close();
} catch(Exception e) {
LOG.warn("Exception invoking close on dataconnectorprovider:" + provider, e);
} finally {
cache.clear();
/**
* After executing Drop or Alter DDL on a dataConnector, we should update cache to clean the dataConnector
* to avoid using the invalid dataConnector next time.
* @param dcName dataConnector to be cleaned
*/
public static synchronized void invalidateDataConnectorFromCache(String dcName) {
try {
IDataConnectorProvider dataConnectorProvider = dataConnectorCache.getIfPresent(dcName);
if (dataConnectorProvider != null) {
dataConnectorCache.invalidate(dcName);
}
} catch (Exception e) {
LOG.warn("Exception when removing dataConnectorProvider: {} from cache due to: {}" + dcName, e.getMessage());
}
}
}

0 comments on commit f61efe6

Please sign in to comment.