Skip to content

Commit

Permalink
Update usages of DefaultBlockWorkerClient
Browse files Browse the repository at this point in the history
  • Loading branch information
aaudiber committed Jul 26, 2016
1 parent 7c7309b commit 0ce938d
Show file tree
Hide file tree
Showing 16 changed files with 40 additions and 40 deletions.
Expand Up @@ -232,7 +232,7 @@ public void promote(long blockId) throws IOException {
// Get the first worker address for now, as this will likely be the location being read from // Get the first worker address for now, as this will likely be the location being read from
// TODO(calvin): Get this location via a policy (possibly location is a parameter to promote) // TODO(calvin): Get this location via a policy (possibly location is a parameter to promote)
WorkerNetAddress workerAddr = info.getLocations().get(0).getWorkerAddress(); WorkerNetAddress workerAddr = info.getLocations().get(0).getWorkerAddress();
DefaultBlockWorkerClient blockWorkerClient = mContext.acquireWorkerClient(workerAddr); BlockWorkerClient blockWorkerClient = mContext.acquireWorkerClient(workerAddr);
try { try {
blockWorkerClient.promoteBlock(blockId); blockWorkerClient.promoteBlock(blockId);
} catch (AlluxioException e) { } catch (AlluxioException e) {
Expand Down
Expand Up @@ -128,11 +128,11 @@ public void close() {
* Obtains a client for a worker with the given address. * Obtains a client for a worker with the given address.
* *
* @param address the address of the worker to get a client to * @param address the address of the worker to get a client to
* @return a {@link DefaultBlockWorkerClient} connected to the worker with the given hostname * @return a {@link BlockWorkerClient} connected to the worker with the given hostname
* @throws IOException if no Alluxio worker is available for the given hostname * @throws IOException if no Alluxio worker is available for the given hostname
*/ */
public DefaultBlockWorkerClient acquireWorkerClient(WorkerNetAddress address) throws IOException { public BlockWorkerClient acquireWorkerClient(WorkerNetAddress address) throws IOException {
DefaultBlockWorkerClient client; BlockWorkerClient client;
if (address == null) { if (address == null) {
throw new RuntimeException(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage()); throw new RuntimeException(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage());
} }
Expand All @@ -151,9 +151,9 @@ public DefaultBlockWorkerClient acquireWorkerClient(WorkerNetAddress address) th
/** /**
* Obtains a worker client on the local worker in the system. For testing only. * Obtains a worker client on the local worker in the system. For testing only.
* *
* @return a {@link DefaultBlockWorkerClient} to a worker in the Alluxio system or null if failed * @return a {@link BlockWorkerClient} to a worker in the Alluxio system or null if failed
*/ */
public DefaultBlockWorkerClient acquireLocalWorkerClient() { public BlockWorkerClient acquireLocalWorkerClient() {
initializeLocalBlockWorkerClientPool(); initializeLocalBlockWorkerClientPool();
if (mLocalBlockWorkerClientPoolMap.isEmpty()) { if (mLocalBlockWorkerClientPoolMap.isEmpty()) {
return null; return null;
Expand All @@ -167,10 +167,10 @@ public DefaultBlockWorkerClient acquireLocalWorkerClient() {
* *
* @param address worker address * @param address worker address
* *
* @return a {@link DefaultBlockWorkerClient} to the given worker address or null if no such worker can * @return a {@link BlockWorkerClient} to the given worker address or null if no such worker can
* be found * be found
*/ */
public DefaultBlockWorkerClient acquireLocalWorkerClient(WorkerNetAddress address) { public BlockWorkerClient acquireLocalWorkerClient(WorkerNetAddress address) {
initializeLocalBlockWorkerClientPool(); initializeLocalBlockWorkerClientPool();
if (!mLocalBlockWorkerClientPoolMap.containsKey(address)) { if (!mLocalBlockWorkerClientPoolMap.containsKey(address)) {
return null; return null;
Expand All @@ -186,7 +186,7 @@ public DefaultBlockWorkerClient acquireLocalWorkerClient(WorkerNetAddress addres
* @param address the address of the worker * @param address the address of the worker
* @return a worker client with a connection to the specified hostname * @return a worker client with a connection to the specified hostname
*/ */
private DefaultBlockWorkerClient acquireRemoteWorkerClient(WorkerNetAddress address) { private BlockWorkerClient acquireRemoteWorkerClient(WorkerNetAddress address) {
// If we couldn't find a worker, crash. // If we couldn't find a worker, crash.
if (address == null) { if (address == null) {
// TODO(calvin): Better exception usage. // TODO(calvin): Better exception usage.
Expand All @@ -200,13 +200,13 @@ private DefaultBlockWorkerClient acquireRemoteWorkerClient(WorkerNetAddress addr
} }


/** /**
* Releases the {@link DefaultBlockWorkerClient} back to the client pool, or destroys it if it was a * Releases the {@link BlockWorkerClient} back to the client pool, or destroys it if it was a
* remote client. * remote client.
* *
* @param blockWorkerClient the worker client to release, the client should not be accessed after * @param blockWorkerClient the worker client to release, the client should not be accessed after
* this method is called * this method is called
*/ */
public void releaseWorkerClient(DefaultBlockWorkerClient blockWorkerClient) { public void releaseWorkerClient(BlockWorkerClient blockWorkerClient) {
// If the client is local and the pool exists, release the client to the pool, otherwise just // If the client is local and the pool exists, release the client to the pool, otherwise just
// close the client. // close the client.
if (blockWorkerClient.isLocal()) { if (blockWorkerClient.isLocal()) {
Expand Down
Expand Up @@ -23,9 +23,9 @@
*/ */
@ThreadSafe @ThreadSafe
final class BlockWorkerClientHeartbeatExecutor implements HeartbeatExecutor { final class BlockWorkerClientHeartbeatExecutor implements HeartbeatExecutor {
private final DefaultBlockWorkerClient mBlockWorkerClient; private final BlockWorkerClient mBlockWorkerClient;


public BlockWorkerClientHeartbeatExecutor(DefaultBlockWorkerClient blockWorkerClient) { public BlockWorkerClientHeartbeatExecutor(BlockWorkerClient blockWorkerClient) {
mBlockWorkerClient = Preconditions.checkNotNull(blockWorkerClient); mBlockWorkerClient = Preconditions.checkNotNull(blockWorkerClient);
} }


Expand Down
Expand Up @@ -29,7 +29,7 @@
* thread is done using the client. * thread is done using the client.
*/ */
@ThreadSafe @ThreadSafe
final class BlockWorkerClientPool extends ResourcePool<DefaultBlockWorkerClient> { final class BlockWorkerClientPool extends ResourcePool<BlockWorkerClient> {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
/** /**
* The capacity for this pool must be large, since each block written will hold a client until * The capacity for this pool must be large, since each block written will hold a client until
Expand All @@ -53,7 +53,7 @@ public void close() {
} }


@Override @Override
public void release(DefaultBlockWorkerClient blockWorkerClient) { public void release(BlockWorkerClient blockWorkerClient) {
try { try {
// Heartbeat to send the client metrics. // Heartbeat to send the client metrics.
blockWorkerClient.sessionHeartbeat(); blockWorkerClient.sessionHeartbeat();
Expand All @@ -65,7 +65,7 @@ public void release(DefaultBlockWorkerClient blockWorkerClient) {
} }


@Override @Override
protected DefaultBlockWorkerClient createNewResource() { protected BlockWorkerClient createNewResource() {
long clientId = IdUtils.getRandomNonNegativeLong(); long clientId = IdUtils.getRandomNonNegativeLong();
return new DefaultBlockWorkerClient(mWorkerNetAddress, ClientContext.getBlockClientExecutorService(), return new DefaultBlockWorkerClient(mWorkerNetAddress, ClientContext.getBlockClientExecutorService(),
clientId, true, ClientContext.getClientMetrics()); clientId, true, ClientContext.getClientMetrics());
Expand Down
Expand Up @@ -35,7 +35,7 @@ public final class LocalBlockInStream extends BufferedBlockInStream {
/** Helper to manage closables. */ /** Helper to manage closables. */
private final Closer mCloser; private final Closer mCloser;
/** Client to communicate with the local worker. */ /** Client to communicate with the local worker. */
private final DefaultBlockWorkerClient mBlockWorkerClient; private final BlockWorkerClient mBlockWorkerClient;
/** The block store context which provides block worker clients. */ /** The block store context which provides block worker clients. */
private final BlockStoreContext mContext; private final BlockStoreContext mContext;
/** The file reader to read a local block. */ /** The file reader to read a local block. */
Expand Down
Expand Up @@ -37,7 +37,7 @@
public final class LocalBlockOutStream extends BufferedBlockOutStream { public final class LocalBlockOutStream extends BufferedBlockOutStream {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
private final Closer mCloser; private final Closer mCloser;
private final DefaultBlockWorkerClient mBlockWorkerClient; private final BlockWorkerClient mBlockWorkerClient;
private final LocalFileBlockWriter mWriter; private final LocalFileBlockWriter mWriter;
private long mReservedBytes; private long mReservedBytes;


Expand Down
Expand Up @@ -38,7 +38,7 @@ public final class RemoteBlockInStream extends BufferedBlockInStream {
private final Long mLockId; private final Long mLockId;


/** Client to communicate with the remote worker. */ /** Client to communicate with the remote worker. */
private final DefaultBlockWorkerClient mBlockWorkerClient; private final BlockWorkerClient mBlockWorkerClient;
/** The block store context which provides block worker clients. */ /** The block store context which provides block worker clients. */
private final BlockStoreContext mContext; private final BlockStoreContext mContext;
private final ClientMetrics mMetrics; private final ClientMetrics mMetrics;
Expand Down
Expand Up @@ -27,7 +27,7 @@
@NotThreadSafe @NotThreadSafe
public final class RemoteBlockOutStream extends BufferedBlockOutStream { public final class RemoteBlockOutStream extends BufferedBlockOutStream {
private final RemoteBlockWriter mRemoteWriter; private final RemoteBlockWriter mRemoteWriter;
private final DefaultBlockWorkerClient mBlockWorkerClient; private final BlockWorkerClient mBlockWorkerClient;
private final ClientMetrics mMetrics; private final ClientMetrics mMetrics;


/** /**
Expand Down
Expand Up @@ -35,7 +35,7 @@
* Tests for {@link AlluxioBlockStore}. * Tests for {@link AlluxioBlockStore}.
*/ */
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({BlockStoreContext.class, DefaultBlockWorkerClient.class}) @PrepareForTest({BlockStoreContext.class})
public final class AlluxioBlockStoreTest { public final class AlluxioBlockStoreTest {
private static final long BLOCK_ID = 3L; private static final long BLOCK_ID = 3L;
private static final long BLOCK_LENGTH = 1000L; private static final long BLOCK_LENGTH = 1000L;
Expand Down Expand Up @@ -72,11 +72,11 @@ public final class AlluxioBlockStoreTest {
public TemporaryFolder mTestFolder = new TemporaryFolder(); public TemporaryFolder mTestFolder = new TemporaryFolder();


private BlockMasterClient mMasterClient; private BlockMasterClient mMasterClient;
private DefaultBlockWorkerClient mBlockWorkerClient; private BlockWorkerClient mBlockWorkerClient;


@Before @Before
public void before() throws Exception { public void before() throws Exception {
mBlockWorkerClient = PowerMockito.mock(DefaultBlockWorkerClient.class); mBlockWorkerClient = PowerMockito.mock(BlockWorkerClient.class);
mMasterClient = PowerMockito.mock(BlockMasterClient.class); mMasterClient = PowerMockito.mock(BlockMasterClient.class);
} }


Expand Down
Expand Up @@ -128,7 +128,7 @@ public void acquireWorkerLimitTest() throws Exception {
.withArguments(Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.anyBoolean(), .withArguments(Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.anyBoolean(),
Mockito.any()).thenReturn(workerClientMock); Mockito.any()).thenReturn(workerClientMock);


final List<DefaultBlockWorkerClient> clients = new ArrayList<>(); final List<BlockWorkerClient> clients = new ArrayList<>();


// Reduce the size of the worker thread pool to lower the chance of a timeout. // Reduce the size of the worker thread pool to lower the chance of a timeout.
Configuration.set(Constants.USER_BLOCK_WORKER_CLIENT_THREADS, "10"); Configuration.set(Constants.USER_BLOCK_WORKER_CLIENT_THREADS, "10");
Expand All @@ -154,7 +154,7 @@ public void acquireWorkerLimitTest() throws Exception {
// Release all the clients // Release all the clients
// Set the RPC number of retries to -1 to prevent the worker client from trying to send a // Set the RPC number of retries to -1 to prevent the worker client from trying to send a
// heartbeat message when it is released. // heartbeat message when it is released.
for (DefaultBlockWorkerClient client : clients) { for (BlockWorkerClient client : clients) {
BlockStoreContext.INSTANCE.releaseWorkerClient(client); BlockStoreContext.INSTANCE.releaseWorkerClient(client);
} }


Expand Down Expand Up @@ -210,7 +210,7 @@ public void hasNoLocalWorkerTest() throws Exception {
class AcquireWorkerClient implements Runnable { class AcquireWorkerClient implements Runnable {
@Override @Override
public void run() { public void run() {
DefaultBlockWorkerClient client = BlockStoreContext.INSTANCE.acquireLocalWorkerClient(); BlockWorkerClient client = BlockStoreContext.INSTANCE.acquireLocalWorkerClient();
BlockStoreContext.INSTANCE.releaseWorkerClient(client); BlockStoreContext.INSTANCE.releaseWorkerClient(client);
} }
} }
Expand Down
Expand Up @@ -12,25 +12,19 @@
package alluxio.client.block; package alluxio.client.block;


import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;


/** /**
* Tests for the {@link BlockWorkerClientHeartbeatExecutor} class. * Tests for the {@link BlockWorkerClientHeartbeatExecutor} class.
*/ */
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultBlockWorkerClient.class)
public class BlockWorkerClientHeartbeatExecutorTest { public class BlockWorkerClientHeartbeatExecutorTest {


/** /**
* Tests to ensure heartbeat calls BlockWorkerClient.periodicHeartbeat. * Tests to ensure heartbeat calls BlockWorkerClient.periodicHeartbeat.
*/ */
@Test @Test
public void heartbeatCallsPeriodicHeartbeat() throws Exception { public void heartbeatCallsPeriodicHeartbeat() throws Exception {
DefaultBlockWorkerClient mock = PowerMockito.mock(DefaultBlockWorkerClient.class); BlockWorkerClient mock = Mockito.mock(BlockWorkerClient.class);
BlockWorkerClientHeartbeatExecutor heartbeatExecutor = BlockWorkerClientHeartbeatExecutor heartbeatExecutor =
new BlockWorkerClientHeartbeatExecutor(mock); new BlockWorkerClientHeartbeatExecutor(mock);


Expand Down
Expand Up @@ -18,7 +18,6 @@
import alluxio.client.WriteType; import alluxio.client.WriteType;
import alluxio.client.block.AlluxioBlockStore; import alluxio.client.block.AlluxioBlockStore;
import alluxio.client.block.BlockStoreContext; import alluxio.client.block.BlockStoreContext;
import alluxio.client.block.DefaultBlockWorkerClient;
import alluxio.client.block.BlockWorkerInfo; import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.block.BufferedBlockOutStream; import alluxio.client.block.BufferedBlockOutStream;
import alluxio.client.block.TestBufferedBlockOutStream; import alluxio.client.block.TestBufferedBlockOutStream;
Expand Down Expand Up @@ -65,7 +64,7 @@
*/ */
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({FileSystemContext.class, BlockStoreContext.class, FileSystemMasterClient.class, @PrepareForTest({FileSystemContext.class, BlockStoreContext.class, FileSystemMasterClient.class,
AlluxioBlockStore.class, UnderFileSystem.class, DefaultBlockWorkerClient.class}) AlluxioBlockStore.class, UnderFileSystem.class})
public class FileOutStreamTest { public class FileOutStreamTest {


private static final long BLOCK_LENGTH = 100L; private static final long BLOCK_LENGTH = 100L;
Expand Down
6 changes: 6 additions & 0 deletions core/common/src/main/java/alluxio/Client.java
Expand Up @@ -40,4 +40,10 @@ public interface Client extends Closeable {
* @return true if this client is connected to the remote * @return true if this client is connected to the remote
*/ */
boolean isConnected(); boolean isConnected();

/**
* Closes the connection with the remote permanently. This instance should be not be reused after
* closing.
*/
void close();
} }
Expand Up @@ -16,7 +16,7 @@
import alluxio.LocalAlluxioClusterResource; import alluxio.LocalAlluxioClusterResource;
import alluxio.client.block.BlockMasterClient; import alluxio.client.block.BlockMasterClient;
import alluxio.client.block.BlockStoreContext; import alluxio.client.block.BlockStoreContext;
import alluxio.client.block.DefaultBlockWorkerClient; import alluxio.client.block.BlockWorkerClient;
import alluxio.client.block.RetryHandlingBlockMasterClient; import alluxio.client.block.RetryHandlingBlockMasterClient;
import alluxio.exception.ConnectionFailedException; import alluxio.exception.ConnectionFailedException;
import alluxio.util.network.NetworkAddressUtils; import alluxio.util.network.NetworkAddressUtils;
Expand Down Expand Up @@ -45,7 +45,7 @@ public class ServiceSocketBindIntegrationTest {


private BlockMasterClient mBlockMasterClient; private BlockMasterClient mBlockMasterClient;
private HttpURLConnection mMasterWebService; private HttpURLConnection mMasterWebService;
private DefaultBlockWorkerClient mBlockWorkerClient; private BlockWorkerClient mBlockWorkerClient;
private SocketChannel mWorkerDataService; private SocketChannel mWorkerDataService;
private HttpURLConnection mWorkerWebService; private HttpURLConnection mWorkerWebService;


Expand Down
Expand Up @@ -13,6 +13,7 @@


import alluxio.Constants; import alluxio.Constants;
import alluxio.LocalAlluxioClusterResource; import alluxio.LocalAlluxioClusterResource;
import alluxio.client.block.BlockWorkerClient;
import alluxio.client.block.DefaultBlockWorkerClient; import alluxio.client.block.DefaultBlockWorkerClient;
import alluxio.client.util.ClientTestUtils; import alluxio.client.util.ClientTestUtils;
import alluxio.security.MasterClientAuthenticationIntegrationTest.NameMatchAuthenticationProvider; import alluxio.security.MasterClientAuthenticationIntegrationTest.NameMatchAuthenticationProvider;
Expand Down Expand Up @@ -88,7 +89,7 @@ public void customAuthenticationDenyConnectTest() throws Exception {
mThrown.expect(IOException.class); mThrown.expect(IOException.class);
mThrown.expectMessage("Failed to connect to the worker"); mThrown.expectMessage("Failed to connect to the worker");


try (DefaultBlockWorkerClient blockWorkerClient = new DefaultBlockWorkerClient( try (BlockWorkerClient blockWorkerClient = new DefaultBlockWorkerClient(
mLocalAlluxioClusterResource.get().getWorkerAddress(), mExecutorService, mLocalAlluxioClusterResource.get().getWorkerAddress(), mExecutorService,
1 /* fake session id */, true, new ClientMetrics())) { 1 /* fake session id */, true, new ClientMetrics())) {
Assert.assertFalse(blockWorkerClient.isConnected()); Assert.assertFalse(blockWorkerClient.isConnected());
Expand Down
Expand Up @@ -20,7 +20,7 @@
import alluxio.client.WriteType; import alluxio.client.WriteType;
import alluxio.client.block.BlockMasterClient; import alluxio.client.block.BlockMasterClient;
import alluxio.client.block.BlockStoreContext; import alluxio.client.block.BlockStoreContext;
import alluxio.client.block.DefaultBlockWorkerClient; import alluxio.client.block.BlockWorkerClient;
import alluxio.client.block.RetryHandlingBlockMasterClient; import alluxio.client.block.RetryHandlingBlockMasterClient;
import alluxio.client.file.FileSystem; import alluxio.client.file.FileSystem;
import alluxio.client.file.URIStatus; import alluxio.client.file.URIStatus;
Expand Down Expand Up @@ -74,7 +74,7 @@ public static Collection<Object[]> data() {
public LocalAlluxioClusterResource mLocalAlluxioClusterResource; public LocalAlluxioClusterResource mLocalAlluxioClusterResource;
private FileSystem mFileSystem = null; private FileSystem mFileSystem = null;
private BlockMasterClient mBlockMasterClient; private BlockMasterClient mBlockMasterClient;
private DefaultBlockWorkerClient mBlockWorkerClient; private BlockWorkerClient mBlockWorkerClient;


public DataServerIntegrationTest(String className, String nettyTransferType, String blockReader) { public DataServerIntegrationTest(String className, String nettyTransferType, String blockReader) {
mLocalAlluxioClusterResource = new LocalAlluxioClusterResource(WORKER_CAPACITY_BYTES, mLocalAlluxioClusterResource = new LocalAlluxioClusterResource(WORKER_CAPACITY_BYTES,
Expand Down

0 comments on commit 0ce938d

Please sign in to comment.