Skip to content

Commit

Permalink
create a worker-specific filesystem master client
Browse files Browse the repository at this point in the history
  • Loading branch information
danielfree committed Sep 12, 2015
1 parent e8d0c83 commit 3b6337e
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 72 deletions.
50 changes: 0 additions & 50 deletions common/src/main/java/tachyon/client/FileSystemMasterClient.java
Expand Up @@ -19,10 +19,8 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;


import com.google.common.base.Preconditions;
import org.apache.thrift.TException; import org.apache.thrift.TException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -47,7 +45,6 @@
* Since thrift clients are not thread safe, this class is a wrapper to provide thread safety, and * Since thrift clients are not thread safe, this class is a wrapper to provide thread safety, and
* to provide retries. * to provide retries.
*/ */
// TODO: split out worker-specific calls to a fs master client for workers.
// TODO: figure out a retry utility to make all the retry logic in this file better. // TODO: figure out a retry utility to make all the retry logic in this file better.
public final class FileSystemMasterClient extends MasterClientBase { public final class FileSystemMasterClient extends MasterClientBase {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
Expand Down Expand Up @@ -212,26 +209,6 @@ public synchronized long getNewBlockIdForFile(long fileId) throws IOException {
throw new IOException("Failed after " + retry + " retries."); throw new IOException("Failed after " + retry + " retries.");
} }


/**
* @return the set of pinned file ids
* @throws IOException if an I/O error occurs
*/
public synchronized Set<Long> getPinList() throws IOException {
int retry = 0;
while (!mClosed && (retry ++) <= RPC_MAX_NUM_RETRY) {
connect();
try {
return mClient.workerGetPinIdList();
} catch (InvalidPathException e) {
throw new IOException(e);
} catch (TException e) {
LOG.error(e.getMessage(), e);
mConnected = false;
}
}
throw new IOException("Failed after " + retry + " retries.");
}

/** /**
* @return the under file system address * @return the under file system address
* @throws IOException if an I/O error occurs * @throws IOException if an I/O error occurs
Expand Down Expand Up @@ -458,33 +435,6 @@ public synchronized boolean free(long fileId, boolean recursive) throws IOExcept
throw new IOException("Failed after " + retry + " retries."); throw new IOException("Failed after " + retry + " retries.");
} }


/**
* Adds a checkpoint.
*
* @param workerId the worker id
* @param fileId the file id
* @param length the checkpoint length
* @param checkpointPath the checkpoint path
* @return whether operation succeeded or not
* @throws IOException if an I/O error occurs
*/
public synchronized boolean addCheckpoint(long workerId, long fileId, long length,
String checkpointPath) throws IOException {
int retry = 0;
while (!mClosed && (retry ++) <= RPC_MAX_NUM_RETRY) {
connect();
try {
return mClient.addCheckpoint(workerId, fileId, length, checkpointPath);
} catch (FileDoesNotExistException e) {
throw new IOException(e);
} catch (TException e) {
LOG.error(e.getMessage(), e);
mConnected = false;
}
}
throw new IOException("Failed after " + retry + " retries.");
}

/** /**
* Reports a lost file. * Reports a lost file.
* *
Expand Down
113 changes: 113 additions & 0 deletions common/src/main/java/tachyon/client/WorkerFileSystemMasterClient.java
@@ -0,0 +1,113 @@
/*
* Licensed to the University of California, Berkeley under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package tachyon.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.ExecutorService;

import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tachyon.Constants;
import tachyon.MasterClientBase;
import tachyon.conf.TachyonConf;
import tachyon.thrift.FileDoesNotExistException;
import tachyon.thrift.FileSystemMasterService;
import tachyon.thrift.InvalidPathException;

/**
* A wrapper for the thrift client to interact with the file system master, used by tachyon worker.
* <p/>
* Since thrift clients are not thread safe, this class is a wrapper to provide thread safety, and
* to provide retries.
*/
public final class WorkerFileSystemMasterClient extends MasterClientBase {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);

private FileSystemMasterService.Client mClient = null;

/**
* Creates a new file system master client for the worker.
*
* @param masterAddress the master address
* @param executorService the executor service
* @param tachyonConf the Tachyon configuration
*/
public WorkerFileSystemMasterClient(InetSocketAddress masterAddress,
ExecutorService executorService, TachyonConf tachyonConf) {
super(masterAddress, executorService, tachyonConf);
}

@Override
protected String getServiceName() {
return Constants.FILE_SYSTEM_MASTER_SERVICE_NAME;
}

@Override
protected void afterConnect() {
mClient = new FileSystemMasterService.Client(mProtocol);
}

/**
* Adds a checkpoint.
*
* @param workerId the worker id
* @param fileId the file id
* @param length the checkpoint length
* @param checkpointPath the checkpoint path
* @return whether operation succeeded or not
* @throws IOException if an I/O error occurs
*/
public synchronized boolean addCheckpoint(long workerId, long fileId, long length,
String checkpointPath) throws IOException {
int retry = 0;
while (!mClosed && (retry ++) <= RPC_MAX_NUM_RETRY) {
connect();
try {
return mClient.addCheckpoint(workerId, fileId, length, checkpointPath);
} catch (FileDoesNotExistException e) {
throw new IOException(e);
} catch (TException e) {
LOG.error(e.getMessage(), e);
mConnected = false;
}
}
throw new IOException("Failed after " + retry + " retries.");
}

/**
* @return the set of pinned file ids
* @throws IOException if an I/O error occurs
*/
public synchronized Set<Long> getPinList() throws IOException {
int retry = 0;
while (!mClosed && (retry ++) <= RPC_MAX_NUM_RETRY) {
connect();
try {
return mClient.workerGetPinIdList();
} catch (InvalidPathException e) {
throw new IOException(e);
} catch (TException e) {
LOG.error(e.getMessage(), e);
mConnected = false;
}
}
throw new IOException("Failed after " + retry + " retries.");
}
}
Expand Up @@ -29,10 +29,10 @@


import tachyon.Constants; import tachyon.Constants;
import tachyon.TachyonURI; import tachyon.TachyonURI;
import tachyon.client.FileSystemMasterClient;
import tachyon.client.TachyonStorageType;
import tachyon.client.ClientOptions; import tachyon.client.ClientOptions;
import tachyon.client.TachyonStorageType;
import tachyon.client.UnderStorageType; import tachyon.client.UnderStorageType;
import tachyon.client.WorkerFileSystemMasterClient;
import tachyon.client.file.FileOutStream; import tachyon.client.file.FileOutStream;
import tachyon.client.file.TachyonFileSystem; import tachyon.client.file.TachyonFileSystem;
import tachyon.client.file.TachyonFile; import tachyon.client.file.TachyonFile;
Expand All @@ -45,14 +45,14 @@ public class PinIntegrationTest {


private LocalTachyonCluster mLocalTachyonCluster = null; private LocalTachyonCluster mLocalTachyonCluster = null;
private TachyonFileSystem mTfs = null; private TachyonFileSystem mTfs = null;
private FileSystemMasterClient mFSMasterClient; private WorkerFileSystemMasterClient mFSMasterClient;


@Before @Before
public final void before() throws Exception { public final void before() throws Exception {
mLocalTachyonCluster = new LocalTachyonCluster(1000, 1000, Constants.GB); mLocalTachyonCluster = new LocalTachyonCluster(1000, 1000, Constants.GB);
mLocalTachyonCluster.start(); mLocalTachyonCluster.start();
mTfs = mLocalTachyonCluster.getClient(); mTfs = mLocalTachyonCluster.getClient();
mFSMasterClient = new FileSystemMasterClient( mFSMasterClient = new WorkerFileSystemMasterClient(
new InetSocketAddress(mLocalTachyonCluster.getMasterHostname(), new InetSocketAddress(mLocalTachyonCluster.getMasterHostname(),
mLocalTachyonCluster.getMasterPort()), mLocalTachyonCluster.getMasterPort()),
mExecutorService, mLocalTachyonCluster.getWorkerTachyonConf()); mExecutorService, mLocalTachyonCluster.getWorkerTachyonConf());
Expand Down
12 changes: 6 additions & 6 deletions servers/src/main/java/tachyon/worker/block/BlockDataManager.java
Expand Up @@ -23,8 +23,8 @@


import tachyon.Constants; import tachyon.Constants;
import tachyon.Sessions; import tachyon.Sessions;
import tachyon.client.FileSystemMasterClient;
import tachyon.client.WorkerBlockMasterClient; import tachyon.client.WorkerBlockMasterClient;
import tachyon.client.WorkerFileSystemMasterClient;
import tachyon.conf.TachyonConf; import tachyon.conf.TachyonConf;
import tachyon.exception.AlreadyExistsException; import tachyon.exception.AlreadyExistsException;
import tachyon.exception.InvalidStateException; import tachyon.exception.InvalidStateException;
Expand Down Expand Up @@ -61,8 +61,8 @@ public final class BlockDataManager {


/** WorkerBlockMasterClient, only used to inform the master of a new block in commitBlock */ /** WorkerBlockMasterClient, only used to inform the master of a new block in commitBlock */
private WorkerBlockMasterClient mWorkerBlockMasterClient; private WorkerBlockMasterClient mWorkerBlockMasterClient;
/** FileSystemMasterClient, only used to inform master of a new file in addCheckpoint */ /** WorkerFileSystemMasterClient, only used to inform master of a new file in addCheckpoint */
private FileSystemMasterClient mFileSystemMasterClient; private WorkerFileSystemMasterClient mWorkerFileSystemMasterClient;
/** UnderFileSystem Client */ /** UnderFileSystem Client */
private UnderFileSystem mUfs; private UnderFileSystem mUfs;
/** Session metadata, used to keep track of session heartbeats */ /** Session metadata, used to keep track of session heartbeats */
Expand All @@ -79,7 +79,7 @@ public final class BlockDataManager {
*/ */
public BlockDataManager(WorkerSource workerSource, public BlockDataManager(WorkerSource workerSource,
WorkerBlockMasterClient workerBlockMasterClient, WorkerBlockMasterClient workerBlockMasterClient,
FileSystemMasterClient fileSystemMasterClient) throws IOException { WorkerFileSystemMasterClient workerFileSystemMasterClient) throws IOException {
// TODO: We may not need to assign the conf to a variable // TODO: We may not need to assign the conf to a variable
mTachyonConf = WorkerContext.getConf(); mTachyonConf = WorkerContext.getConf();
mHeartbeatReporter = new BlockHeartbeatReporter(); mHeartbeatReporter = new BlockHeartbeatReporter();
Expand All @@ -88,7 +88,7 @@ public BlockDataManager(WorkerSource workerSource,
mMetricsReporter = new BlockMetricsReporter(mWorkerSource); mMetricsReporter = new BlockMetricsReporter(mWorkerSource);


mWorkerBlockMasterClient = workerBlockMasterClient; mWorkerBlockMasterClient = workerBlockMasterClient;
mFileSystemMasterClient = fileSystemMasterClient; mWorkerFileSystemMasterClient = workerFileSystemMasterClient;


// Create Under FileSystem Client // Create Under FileSystem Client
String ufsAddress = String ufsAddress =
Expand Down Expand Up @@ -165,7 +165,7 @@ public void addCheckpoint(long sessionId, long fileId) throws TException, IOExce
} catch (IOException ioe) { } catch (IOException ioe) {
throw new FailedToCheckpointException("Failed to getFileSize " + dstPath); throw new FailedToCheckpointException("Failed to getFileSize " + dstPath);
} }
mFileSystemMasterClient.addCheckpoint(mWorkerId, fileId, fileSize, dstPath); mWorkerFileSystemMasterClient.addCheckpoint(mWorkerId, fileId, fileSize, dstPath);
} }


/** /**
Expand Down
14 changes: 7 additions & 7 deletions servers/src/main/java/tachyon/worker/block/BlockWorker.java
Expand Up @@ -31,8 +31,8 @@


import tachyon.Constants; import tachyon.Constants;
import tachyon.Sessions; import tachyon.Sessions;
import tachyon.client.FileSystemMasterClient;
import tachyon.client.WorkerBlockMasterClient; import tachyon.client.WorkerBlockMasterClient;
import tachyon.client.WorkerFileSystemMasterClient;
import tachyon.conf.TachyonConf; import tachyon.conf.TachyonConf;
import tachyon.metrics.MetricsSystem; import tachyon.metrics.MetricsSystem;
import tachyon.thrift.NetAddress; import tachyon.thrift.NetAddress;
Expand Down Expand Up @@ -75,7 +75,7 @@ public final class BlockWorker {
/** Client for all block master communication */ /** Client for all block master communication */
private final WorkerBlockMasterClient mWorkerBlockMasterClient; private final WorkerBlockMasterClient mWorkerBlockMasterClient;
/** Client for all file system master communication */ /** Client for all file system master communication */
private final FileSystemMasterClient mFileSystemMasterClient; private final WorkerFileSystemMasterClient mWorkerFileSystemMasterClient;
/** The executor service for the master client thread */ /** The executor service for the master client thread */
private final ExecutorService mMasterClientExecutorService; private final ExecutorService mMasterClientExecutorService;
/** Threadpool for the master sync */ /** Threadpool for the master sync */
Expand Down Expand Up @@ -164,14 +164,14 @@ public BlockWorker() throws IOException {
new WorkerBlockMasterClient(NetworkAddressUtils.getConnectAddress(ServiceType.MASTER_RPC, new WorkerBlockMasterClient(NetworkAddressUtils.getConnectAddress(ServiceType.MASTER_RPC,
mTachyonConf), mMasterClientExecutorService, mTachyonConf); mTachyonConf), mMasterClientExecutorService, mTachyonConf);


mFileSystemMasterClient = mWorkerFileSystemMasterClient = new WorkerFileSystemMasterClient(
new FileSystemMasterClient(NetworkAddressUtils.getConnectAddress(ServiceType.MASTER_RPC, NetworkAddressUtils.getConnectAddress(ServiceType.MASTER_RPC, mTachyonConf),
mTachyonConf), mMasterClientExecutorService, mTachyonConf); mMasterClientExecutorService, mTachyonConf);


// Set up BlockDataManager // Set up BlockDataManager
WorkerSource workerSource = new WorkerSource(); WorkerSource workerSource = new WorkerSource();
mBlockDataManager = mBlockDataManager =
new BlockDataManager(workerSource, mWorkerBlockMasterClient, mFileSystemMasterClient); new BlockDataManager(workerSource, mWorkerBlockMasterClient, mWorkerFileSystemMasterClient);


// Setup metrics collection // Setup metrics collection
mWorkerMetricsSystem = new MetricsSystem("worker", mTachyonConf); mWorkerMetricsSystem = new MetricsSystem("worker", mTachyonConf);
Expand Down Expand Up @@ -217,7 +217,7 @@ public BlockWorker() throws IOException {
mBlockMasterSync.setWorkerId(); mBlockMasterSync.setWorkerId();


// Setup PinListSyncer // Setup PinListSyncer
mPinListSync = new PinListSync(mBlockDataManager, mTachyonConf, mFileSystemMasterClient); mPinListSync = new PinListSync(mBlockDataManager, mTachyonConf, mWorkerFileSystemMasterClient);


// Setup session cleaner // Setup session cleaner
mSessionCleanerThread = new SessionCleaner(mBlockDataManager, mTachyonConf); mSessionCleanerThread = new SessionCleaner(mBlockDataManager, mTachyonConf);
Expand Down
8 changes: 3 additions & 5 deletions servers/src/main/java/tachyon/worker/block/PinListSync.java
Expand Up @@ -15,15 +15,13 @@


package tachyon.worker.block; package tachyon.worker.block;


import java.io.IOException;
import java.util.HashSet;
import java.util.Set; import java.util.Set;


import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import tachyon.Constants; import tachyon.Constants;
import tachyon.client.FileSystemMasterClient; import tachyon.client.WorkerFileSystemMasterClient;
import tachyon.conf.TachyonConf; import tachyon.conf.TachyonConf;
import tachyon.util.CommonUtils; import tachyon.util.CommonUtils;


Expand All @@ -47,7 +45,7 @@ public final class PinListSync implements Runnable {
private final int mSyncTimeoutMs; private final int mSyncTimeoutMs;


/** Client for all master communication */ /** Client for all master communication */
private FileSystemMasterClient mMasterClient; private WorkerFileSystemMasterClient mMasterClient;
/** Flag to indicate if the syncing should continue */ /** Flag to indicate if the syncing should continue */
private volatile boolean mRunning; private volatile boolean mRunning;


Expand All @@ -59,7 +57,7 @@ public final class PinListSync implements Runnable {
* @param masterClient the Tachyon master client * @param masterClient the Tachyon master client
*/ */
public PinListSync(BlockDataManager blockDataManager, TachyonConf tachyonConf, public PinListSync(BlockDataManager blockDataManager, TachyonConf tachyonConf,
FileSystemMasterClient masterClient) { WorkerFileSystemMasterClient masterClient) {
mBlockDataManager = blockDataManager; mBlockDataManager = blockDataManager;
mTachyonConf = tachyonConf; mTachyonConf = tachyonConf;
mMasterClient = masterClient; mMasterClient = masterClient;
Expand Down

0 comments on commit 3b6337e

Please sign in to comment.