diff --git a/core/common/src/main/java/alluxio/AbstractClient.java b/core/common/src/main/java/alluxio/AbstractClient.java index e02c27cd88b7..bd8228b59228 100644 --- a/core/common/src/main/java/alluxio/AbstractClient.java +++ b/core/common/src/main/java/alluxio/AbstractClient.java @@ -20,6 +20,7 @@ import alluxio.exception.status.UnimplementedException; import alluxio.retry.ExponentialBackoffRetry; import alluxio.retry.RetryPolicy; +import alluxio.security.authentication.TProtocols; import alluxio.security.authentication.TransportProvider; import alluxio.thrift.AlluxioService; import alluxio.thrift.AlluxioTException; @@ -27,8 +28,6 @@ import com.google.common.base.Preconditions; import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TMultiplexedProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; @@ -177,9 +176,8 @@ public synchronized void connect() throws AlluxioStatusException { LOG.info("Alluxio client (version {}) is trying to connect with {} @ {}", RuntimeConstants.VERSION, getServiceName(), mAddress); - TProtocol binaryProtocol = - new TBinaryProtocol(mTransportProvider.getClientTransport(mParentSubject, mAddress)); - mProtocol = new TMultiplexedProtocol(binaryProtocol, getServiceName()); + mProtocol = TProtocols.createProtocol( + mTransportProvider.getClientTransport(mParentSubject, mAddress), getServiceName()); try { mProtocol.getTransport().open(); LOG.info("Client registered with {} @ {}", getServiceName(), mAddress); diff --git a/core/common/src/main/java/alluxio/master/PollingMasterInquireClient.java b/core/common/src/main/java/alluxio/master/PollingMasterInquireClient.java index b5661f5262d7..a17714b4d121 100644 --- a/core/common/src/main/java/alluxio/master/PollingMasterInquireClient.java +++ b/core/common/src/main/java/alluxio/master/PollingMasterInquireClient.java @@ -15,11 +15,11 @@ import alluxio.exception.status.UnauthenticatedException; import alluxio.exception.status.UnavailableException; import alluxio.retry.RetryPolicy; +import alluxio.security.authentication.TProtocols; import alluxio.security.authentication.TransportProvider; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TMultiplexedProtocol; import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,12 +86,8 @@ private InetSocketAddress getAddress() { private void pingMetaService(InetSocketAddress address) throws UnauthenticatedException, TTransportException { - TransportProvider transportProvider = TransportProvider.Factory.create(); - - TProtocol binaryProtocol = new TBinaryProtocol(transportProvider.getClientTransport(address)); - TMultiplexedProtocol protocol = - new TMultiplexedProtocol(binaryProtocol, Constants.META_MASTER_SERVICE_NAME); - + TTransport transport = TransportProvider.Factory.create().getClientTransport(address); + TProtocol protocol = TProtocols.createProtocol(transport, Constants.META_MASTER_SERVICE_NAME); protocol.getTransport().open(); protocol.getTransport().close(); } diff --git a/core/common/src/main/java/alluxio/security/authentication/TProtocols.java b/core/common/src/main/java/alluxio/security/authentication/TProtocols.java new file mode 100644 index 000000000000..37d7ade4cfe0 --- /dev/null +++ b/core/common/src/main/java/alluxio/security/authentication/TProtocols.java @@ -0,0 +1,39 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.security.authentication; + +import alluxio.exception.status.UnauthenticatedException; + +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TMultiplexedProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; + +/** + * Class for creating Thrift protocols for communicating with Alluxio services. + */ +public final class TProtocols { + + /** + * @param transport a transport for communicating with an Alluxio Thrift server + * @param serviceName the service to communicate with + * @return a Thrift protocol for communicating with the given service through the transport + */ + public static TProtocol createProtocol(TTransport transport, String serviceName) + throws UnauthenticatedException { + TProtocol binaryProtocol = new TBinaryProtocol(transport); + TProtocol multiplexedProtocol = new TMultiplexedProtocol(binaryProtocol, serviceName); + return multiplexedProtocol; + } + + private TProtocols() {} // not intended for instantiation +} diff --git a/tests/src/test/java/alluxio/zookeeper/ZookeeperFailureIntegrationTest.java b/tests/src/test/java/alluxio/zookeeper/ZookeeperFailureIntegrationTest.java index 964e9b7ba750..51b4b273789c 100644 --- a/tests/src/test/java/alluxio/zookeeper/ZookeeperFailureIntegrationTest.java +++ b/tests/src/test/java/alluxio/zookeeper/ZookeeperFailureIntegrationTest.java @@ -21,6 +21,7 @@ import alluxio.multi.process.MasterNetAddress; import alluxio.multi.process.MultiProcessCluster; import alluxio.multi.process.MultiProcessCluster.DeployMode; +import alluxio.security.authentication.TProtocols; import alluxio.security.authentication.TransportProvider; import alluxio.security.authentication.TransportProvider.Factory; import alluxio.thrift.FileSystemMasterClientService.Client; @@ -30,8 +31,6 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TMultiplexedProtocol; import org.apache.thrift.protocol.TProtocol; import org.junit.Rule; import org.junit.Test; @@ -113,10 +112,9 @@ private boolean rpcServiceAvailable() throws Exception { new InetSocketAddress(netAddress.getHostname(), netAddress.getRpcPort()); try { TransportProvider transportProvider = Factory.create(); - TProtocol binaryProtocol = - new TBinaryProtocol(transportProvider.getClientTransport(null, address)); - TMultiplexedProtocol protocol = new TMultiplexedProtocol(binaryProtocol, - Constants.FILE_SYSTEM_MASTER_CLIENT_SERVICE_NAME); + TProtocol protocol = + TProtocols.createProtocol(transportProvider.getClientTransport(null, address), + Constants.FILE_SYSTEM_MASTER_CLIENT_SERVICE_NAME); Client client = new Client(protocol); client.listStatus("/", new ListStatusTOptions()); } catch (TException e) {