Skip to content

Commit

Permalink
FileSystemWorkerThriftPool implememntation
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Oct 5, 2016
1 parent b22b089 commit d84b172
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 176 deletions.
18 changes: 0 additions & 18 deletions core/client/src/main/java/alluxio/client/ClientContext.java
Expand Up @@ -14,13 +14,10 @@
import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.metrics.MetricsSystem;
import alluxio.util.ThreadFactoryUtils;

import com.google.common.base.Preconditions;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.concurrent.NotThreadSafe;

Expand All @@ -30,7 +27,6 @@
*/
@NotThreadSafe
public final class ClientContext {
private static ExecutorService sFileClientExecutorService;
private static InetSocketAddress sMasterAddress;

static {
Expand All @@ -46,13 +42,6 @@ public final class ClientContext {
* This method requires that configuration has been initialized.
*/
public static void init() {
if (sFileClientExecutorService != null) {
sFileClientExecutorService.shutdownNow();
}
sFileClientExecutorService = Executors
.newFixedThreadPool(Configuration.getInt(PropertyKey.USER_FILE_WORKER_CLIENT_THREADS),
ThreadFactoryUtils.build("file-worker-heartbeat-%d", true));

String masterHostname =
Preconditions.checkNotNull(Configuration.get(PropertyKey.MASTER_HOSTNAME));
int masterPort = Configuration.getInt(PropertyKey.MASTER_RPC_PORT);
Expand All @@ -68,12 +57,5 @@ public static InetSocketAddress getMasterAddress() {
return sMasterAddress;
}

/**
* @return the executor service for file clients
*/
public static ExecutorService getFileClientExecutorService() {
return sFileClientExecutorService;
}

private ClientContext() {} // prevent instantiation
}
Expand Up @@ -90,8 +90,7 @@ public FileSystemWorkerClient createWorkerClient() throws IOException {
address = mWorkerAddresses.get(ThreadLocalRandom.current().nextInt(mWorkerAddresses.size()));
}
long sessionId = IdUtils.getRandomNonNegativeLong();
return new FileSystemWorkerClient(address, ClientContext.getFileClientExecutorService(),
sessionId);
return new FileSystemWorkerClient(address, sessionId);
}

/**
Expand Down

0 comments on commit d84b172

Please sign in to comment.