Skip to content

Commit

Permalink
Refactor journal and inquire client code
Browse files Browse the repository at this point in the history
This further separates the journal implementation from the master
implementation, and creates an interface for primary master discovery.

No functionality is changed.
  • Loading branch information
aaudiber committed Aug 12, 2017
1 parent abda421 commit 1de80a7
Show file tree
Hide file tree
Showing 117 changed files with 2,148 additions and 1,170 deletions.
Expand Up @@ -13,6 +13,7 @@

import alluxio.AbstractMasterClient;
import alluxio.Constants;
import alluxio.master.MasterInquireClient;
import alluxio.thrift.AlluxioService;
import alluxio.thrift.GetMasterInfoTOptions;
import alluxio.thrift.MetaMasterClientService;
Expand All @@ -22,7 +23,6 @@
import org.apache.thrift.TException;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Set;

Expand All @@ -41,13 +41,12 @@ public final class RetryHandlingMetaMasterClient extends AbstractMasterClient
private MetaMasterClientService.Client mClient;

/**
* Creates a new block master client.
* Creates a new meta master client.
*
* @param subject the parent subject, set to null if not present
* @param masterAddress the master address
*/
public RetryHandlingMetaMasterClient(Subject subject, InetSocketAddress masterAddress) {
super(subject, masterAddress);
public RetryHandlingMetaMasterClient(Subject subject) {
super(subject, MasterInquireClient.Factory.create());
}

@Override
Expand Down
Expand Up @@ -12,11 +12,11 @@
package alluxio.client.block;

import alluxio.Client;
import alluxio.master.MasterInquireClient;
import alluxio.wire.BlockInfo;
import alluxio.wire.WorkerInfo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;

import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -38,22 +38,22 @@ private Factory() {} // prevent instantiation
/**
* Factory method for {@link BlockMasterClient}.
*
* @param masterAddress the master address
* @return a new {@link BlockMasterClient} instance
*/
public static BlockMasterClient create(InetSocketAddress masterAddress) {
return create(null, masterAddress);
public static BlockMasterClient create() {
return create(null, MasterInquireClient.Factory.create());
}

/**
* Factory method for {@link BlockMasterClient}.
*
* @param subject the parent subject
* @param masterAddress the master address
* @param masterInquireClient a client for determining the master address
* @return a new {@link BlockMasterClient} instance
*/
public static BlockMasterClient create(Subject subject, InetSocketAddress masterAddress) {
return RetryHandlingBlockMasterClient.create(subject, masterAddress);
public static BlockMasterClient create(Subject subject,
MasterInquireClient masterInquireClient) {
return new RetryHandlingBlockMasterClient(subject, masterInquireClient);
}
}

Expand Down
Expand Up @@ -13,12 +13,12 @@

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.master.MasterInquireClient;
import alluxio.resource.ResourcePool;

import com.google.common.io.Closer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

Expand All @@ -32,35 +32,20 @@
*/
@ThreadSafe
public final class BlockMasterClientPool extends ResourcePool<BlockMasterClient> {
private final InetSocketAddress mMasterAddress;
private final MasterInquireClient mMasterInquireClient;
private final Queue<BlockMasterClient> mClientList;
private final Subject mSubject;

/**
* Creates a new block master client pool.
*
* @param subject the parent subject
* @param masterAddress the master address
* @param masterInquireClient a client for determining the master address
*/
public BlockMasterClientPool(Subject subject, InetSocketAddress masterAddress) {
public BlockMasterClientPool(Subject subject, MasterInquireClient masterInquireClient) {
super(Configuration.getInt(PropertyKey.USER_BLOCK_MASTER_CLIENT_THREADS));
mSubject = subject;
mMasterAddress = masterAddress;
mClientList = new ConcurrentLinkedQueue<>();
}

/**
* Creates a new block master client pool.
*
* @param subject the parent subject
* @param masterAddress the master address
* @param clientThreads the number of client threads to use
*/
public BlockMasterClientPool(Subject subject, InetSocketAddress masterAddress,
int clientThreads) {
super(clientThreads);
mSubject = subject;
mMasterAddress = masterAddress;
mMasterInquireClient = masterInquireClient;
mClientList = new ConcurrentLinkedQueue<>();
}

Expand All @@ -76,7 +61,7 @@ public void close() throws IOException {

@Override
protected BlockMasterClient createNewResource() {
BlockMasterClient client = BlockMasterClient.Factory.create(mSubject, mMasterAddress);
BlockMasterClient client = BlockMasterClient.Factory.create(mSubject, mMasterInquireClient);
mClientList.add(client);
return client;
}
Expand Down
Expand Up @@ -13,6 +13,7 @@

import alluxio.AbstractMasterClient;
import alluxio.Constants;
import alluxio.master.MasterInquireClient;
import alluxio.thrift.AlluxioService;
import alluxio.thrift.BlockMasterClientService;
import alluxio.thrift.GetBlockInfoTOptions;
Expand All @@ -26,7 +27,6 @@
import org.apache.thrift.TException;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -48,15 +48,10 @@ public final class RetryHandlingBlockMasterClient extends AbstractMasterClient
* Creates a new block master client.
*
* @param subject the parent subject, set to null if not present
* @param masterAddress the master address
* @param masterInquireClient a client for determining the master address
*/
protected static RetryHandlingBlockMasterClient create(Subject subject,
InetSocketAddress masterAddress) {
return new RetryHandlingBlockMasterClient(subject, masterAddress);
}

private RetryHandlingBlockMasterClient(Subject subject, InetSocketAddress masterAddress) {
super(subject, masterAddress);
public RetryHandlingBlockMasterClient(Subject subject, MasterInquireClient masterInquireClient) {
super(subject, masterInquireClient);
}

@Override
Expand Down
Expand Up @@ -18,11 +18,11 @@
import alluxio.client.netty.NettyClient;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.UnavailableException;
import alluxio.master.MasterInquireClient;
import alluxio.metrics.MetricsSystem;
import alluxio.network.connection.NettyChannelPool;
import alluxio.resource.CloseableResource;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.util.network.NetworkAddressUtils.ServiceType;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;

Expand Down Expand Up @@ -71,9 +71,9 @@ public final class FileSystemContext implements Closeable {
private final ConcurrentHashMapV8<SocketAddress, NettyChannelPool>
mNettyChannelPools = new ConcurrentHashMapV8<>();

/** The shared master address associated with the {@link FileSystemContext}. */
/** The shared master inquire client associated with the {@link FileSystemContext}. */
@GuardedBy("this")
private InetSocketAddress mMasterAddress;
private MasterInquireClient mMasterInquireClient;

/**
* Indicates whether the {@link #mLocalWorker} field has been lazily initialized yet.
Expand Down Expand Up @@ -124,9 +124,10 @@ private FileSystemContext(Subject subject) {
* Initializes the context. Only called in the factory methods and reset.
*/
private void init() {
mMasterAddress = NetworkAddressUtils.getConnectAddress(ServiceType.MASTER_RPC);
mFileSystemMasterClientPool = new FileSystemMasterClientPool(mParentSubject, mMasterAddress);
mBlockMasterClientPool = new BlockMasterClientPool(mParentSubject, mMasterAddress);
mMasterInquireClient = MasterInquireClient.Factory.create();
mFileSystemMasterClientPool =
new FileSystemMasterClientPool(mParentSubject, mMasterInquireClient);
mBlockMasterClientPool = new BlockMasterClientPool(mParentSubject, mMasterInquireClient);
}

/**
Expand All @@ -147,7 +148,7 @@ public void close() throws IOException {
mNettyChannelPools.clear();

synchronized (this) {
mMasterAddress = null;
mMasterInquireClient = null;
mLocalWorkerInitialized = false;
mLocalWorker = null;
}
Expand All @@ -173,7 +174,7 @@ public Subject getParentSubject() {
* @return the master address
*/
public synchronized InetSocketAddress getMasterAddress() {
return mMasterAddress;
return mMasterInquireClient.getPrimaryRpcAddress();
}

/**
Expand Down
Expand Up @@ -12,7 +12,7 @@
package alluxio.client.file;

import alluxio.AlluxioURI;
import alluxio.MasterClient;
import alluxio.Client;
import alluxio.client.file.options.CheckConsistencyOptions;
import alluxio.client.file.options.CompleteFileOptions;
import alluxio.client.file.options.CreateDirectoryOptions;
Expand All @@ -27,9 +27,9 @@
import alluxio.exception.status.AlreadyExistsException;
import alluxio.exception.status.NotFoundException;
import alluxio.wire.MountPointInfo;
import alluxio.master.MasterInquireClient;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;

Expand All @@ -38,7 +38,7 @@
/**
* A client to use for interacting with a file system master.
*/
public interface FileSystemMasterClient extends MasterClient {
public interface FileSystemMasterClient extends Client {

/**
* Factory for {@link FileSystemMasterClient}.
Expand All @@ -50,22 +50,22 @@ private Factory() {} // prevent instantiation
/**
* Factory method for {@link FileSystemMasterClient}.
*
* @param masterAddress the master address
* @return a new {@link FileSystemMasterClient} instance
*/
public static FileSystemMasterClient create(InetSocketAddress masterAddress) {
return create(null, masterAddress);
public static FileSystemMasterClient create() {
return create(null, MasterInquireClient.Factory.create());
}

/**
* Factory method for {@link FileSystemMasterClient}.
*
* @param subject the parent subject
* @param masterAddress the master address
* @param masterInquireClient a client for determining the master address
* @return a new {@link FileSystemMasterClient} instance
*/
public static FileSystemMasterClient create(Subject subject, InetSocketAddress masterAddress) {
return RetryHandlingFileSystemMasterClient.create(subject, masterAddress);
public static FileSystemMasterClient create(Subject subject,
MasterInquireClient masterInquireClient) {
return new RetryHandlingFileSystemMasterClient(subject, masterInquireClient);
}
}

Expand Down
Expand Up @@ -13,12 +13,12 @@

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.master.MasterInquireClient;
import alluxio.resource.ResourcePool;

import com.google.common.io.Closer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

Expand All @@ -30,19 +30,19 @@
*/
@ThreadSafe
public final class FileSystemMasterClientPool extends ResourcePool<FileSystemMasterClient> {
private final InetSocketAddress mMasterAddress;
private final MasterInquireClient mMasterInquireClient;
private final Queue<FileSystemMasterClient> mClientList;
private final Subject mSubject;

/**
* Creates a new file system master client pool.
*
* @param subject the parent subject
* @param masterAddress the master address
* @param masterInquireClient a client for determining the master address
*/
public FileSystemMasterClientPool(Subject subject, InetSocketAddress masterAddress) {
public FileSystemMasterClientPool(Subject subject, MasterInquireClient masterInquireClient) {
super(Configuration.getInt(PropertyKey.USER_FILE_MASTER_CLIENT_THREADS));
mMasterAddress = masterAddress;
mMasterInquireClient = masterInquireClient;
mClientList = new ConcurrentLinkedQueue<>();
mSubject = subject;
}
Expand All @@ -51,13 +51,13 @@ public FileSystemMasterClientPool(Subject subject, InetSocketAddress masterAddre
* Creates a new file system master client pool.
*
* @param subject the parent subject
* @param masterAddress the master address
* @param masterInquireClient a client for determining the master address
* @param clientThreads the number of client threads to use
*/
public FileSystemMasterClientPool(Subject subject, InetSocketAddress masterAddress,
public FileSystemMasterClientPool(Subject subject, MasterInquireClient masterInquireClient,
int clientThreads) {
super(clientThreads);
mMasterAddress = masterAddress;
mMasterInquireClient = masterInquireClient;
mClientList = new ConcurrentLinkedQueue<>();
mSubject = subject;
}
Expand All @@ -74,7 +74,8 @@ public void close() throws IOException {

@Override
protected FileSystemMasterClient createNewResource() {
FileSystemMasterClient client = FileSystemMasterClient.Factory.create(mSubject, mMasterAddress);
FileSystemMasterClient client =
FileSystemMasterClient.Factory.create(mSubject, mMasterInquireClient);
mClientList.add(client);
return client;
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import alluxio.client.file.options.LoadMetadataOptions;
import alluxio.client.file.options.MountOptions;
import alluxio.client.file.options.SetAttributeOptions;
import alluxio.master.MasterInquireClient;
import alluxio.thrift.AlluxioService;
import alluxio.thrift.FileSystemMasterClientService;
import alluxio.thrift.GetNewBlockIdForFileTOptions;
Expand All @@ -39,7 +40,6 @@
import org.apache.thrift.TException;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -63,15 +63,11 @@ public final class RetryHandlingFileSystemMasterClient extends AbstractMasterCli
* Creates a new {@link RetryHandlingFileSystemMasterClient} instance.
*
* @param subject the subject
* @param masterAddress the master address
* @param masterInquireClient a client for determining the master address
*/
protected static RetryHandlingFileSystemMasterClient create(Subject subject,
InetSocketAddress masterAddress) {
return new RetryHandlingFileSystemMasterClient(subject, masterAddress);
}

private RetryHandlingFileSystemMasterClient(Subject subject, InetSocketAddress masterAddress) {
super(subject, masterAddress);
public RetryHandlingFileSystemMasterClient(Subject subject,
MasterInquireClient masterInquireClient) {
super(subject, masterInquireClient);
}

@Override
Expand Down

0 comments on commit 1de80a7

Please sign in to comment.