Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import static org.apache.gravitino.metrics.MetricNames.FILESYSTEM_CACHE;

import com.codahale.metrics.caffeine.MetricsStatsCounter;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
Expand All @@ -49,14 +46,12 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
Expand All @@ -72,6 +67,8 @@
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.catalog.FilesetFileOps;
import org.apache.gravitino.catalog.ManagedSchemaOperations;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemCache;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemCacheKey;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.connector.CatalogInfo;
Expand Down Expand Up @@ -103,7 +100,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -137,8 +133,7 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations

private FilesetCatalogMetricsSource catalogMetricsSource;

@VisibleForTesting ScheduledThreadPoolExecutor scheduler;
@VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
@VisibleForTesting FileSystemCache fileSystemCache;

private final ThreadPoolExecutor fileSystemExecutor =
new ThreadPoolExecutor(
Expand All @@ -161,51 +156,6 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations
this.store = store;
}

static class FileSystemCacheKey {
// When the path is a path without scheme such as 'file','hdfs', etc., then the scheme and
// authority are both null
@Nullable private final String scheme;
@Nullable private final String authority;
private final Map<String, String> conf;
private final String currentUser;

FileSystemCacheKey(String scheme, String authority, Map<String, String> conf) {
this.scheme = scheme;
this.authority = authority;
this.conf = conf;

try {
this.currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) {
throw new RuntimeException("Failed to get current user", e);
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FileSystemCacheKey)) {
return false;
}
FileSystemCacheKey that = (FileSystemCacheKey) o;
return conf.equals(that.conf)
&& (scheme == null ? that.scheme == null : scheme.equals(that.scheme))
&& (authority == null ? that.authority == null : authority.equals(that.authority))
&& currentUser.equals(that.currentUser);
}

@Override
public int hashCode() {
int result = conf.hashCode();
result = 31 * result + (scheme == null ? 0 : scheme.hashCode());
result = 31 * result + (authority == null ? 0 : authority.hashCode());
result = 31 * result + currentUser.hashCode();
return result;
}
}

public FilesetCatalogOperations() {
this(GravitinoEnv.getInstance().entityStore());
}
Expand Down Expand Up @@ -264,26 +214,10 @@ public void initialize(
FileSystemUtils.getFileSystemProviderByName(
fileSystemProvidersMap, defaultFileSystemProviderName);

scheduler =
new ScheduledThreadPoolExecutor(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("file-system-cache-for-fileset" + "-%d")
.build());

Caffeine<Object, Object> cacheBuilder =
Caffeine.newBuilder()
FileSystemCache.Builder cacheBuilder =
FileSystemCache.newBuilder()
.expireAfterAccess(1, TimeUnit.HOURS)
.removalListener(
(ignored, value, cause) -> {
try {
((FileSystem) value).close();
} catch (IOException e) {
LOG.warn("Failed to close FileSystem instance in cache", e);
}
})
.scheduler(Scheduler.forScheduledExecutorService(scheduler));
.withCleanerScheduler("file-system-cache-for-fileset-%d");

// Metrics System could be null in UT.
if (metricsSystem != null) {
Expand Down Expand Up @@ -1009,27 +943,12 @@ public void testConnection(

@Override
public void close() throws IOException {
// do nothing
if (scheduler != null) {
scheduler.shutdownNow();
}

if (!fileSystemExecutor.isShutdown()) {
fileSystemExecutor.shutdownNow();
}

if (fileSystemCache != null) {
fileSystemCache
.asMap()
.forEach(
(k, v) -> {
try {
v.close();
} catch (IOException e) {
LOG.warn("Failed to close FileSystem instance in cache", e);
}
});
fileSystemCache.cleanUp();
fileSystemCache.close();
}

// Metrics System could be null in UT.
Expand Down
2 changes: 2 additions & 0 deletions catalogs/hadoop-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ plugins {
// try to avoid adding extra dependencies because it is used by catalogs and connectors.
dependencies {
implementation(project(":api"))
implementation(libs.caffeine)
implementation(libs.commons.lang3)
implementation(libs.hadoop3.client.api)
implementation(libs.hadoop3.client.runtime)
implementation(libs.guava)
implementation(libs.slf4j.api)

testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
Expand Down
Loading
Loading