Skip to content

Commit

Permalink
HDFS-11546. Federation Router RPC server. Contributed by Jason Kace a…
Browse files Browse the repository at this point in the history
…nd Inigo Goiri.

(cherry picked from commit 8a9cdeb)
  • Loading branch information
Inigo Goiri committed Oct 7, 2017
1 parent 4bf877b commit ca4f209
Show file tree
Hide file tree
Showing 21 changed files with 5,675 additions and 388 deletions.
Expand Up @@ -1123,6 +1123,44 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// HDFS Router-based federation
public static final String FEDERATION_ROUTER_PREFIX =
"dfs.federation.router.";
public static final String DFS_ROUTER_DEFAULT_NAMESERVICE =
FEDERATION_ROUTER_PREFIX + "default.nameserviceId";
public static final String DFS_ROUTER_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_PREFIX + "handler.count";
public static final int DFS_ROUTER_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_ROUTER_READER_QUEUE_SIZE_KEY =
FEDERATION_ROUTER_PREFIX + "reader.queue.size";
public static final int DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT = 100;
public static final String DFS_ROUTER_READER_COUNT_KEY =
FEDERATION_ROUTER_PREFIX + "reader.count";
public static final int DFS_ROUTER_READER_COUNT_DEFAULT = 1;
public static final String DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY =
FEDERATION_ROUTER_PREFIX + "handler.queue.size";
public static final int DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
public static final String DFS_ROUTER_RPC_BIND_HOST_KEY =
FEDERATION_ROUTER_PREFIX + "rpc-bind-host";
public static final int DFS_ROUTER_RPC_PORT_DEFAULT = 8888;
public static final String DFS_ROUTER_RPC_ADDRESS_KEY =
FEDERATION_ROUTER_PREFIX + "rpc-address";
public static final String DFS_ROUTER_RPC_ADDRESS_DEFAULT =
"0.0.0.0:" + DFS_ROUTER_RPC_PORT_DEFAULT;
public static final String DFS_ROUTER_RPC_ENABLE =
FEDERATION_ROUTER_PREFIX + "rpc.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;

// HDFS Router NN client
public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
FEDERATION_ROUTER_PREFIX + "connection.pool-size";
public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT =
64;
public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN =
FEDERATION_ROUTER_PREFIX + "connection.pool.clean.ms";
public static final long DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT =
TimeUnit.MINUTES.toMillis(1);
public static final String DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS =
FEDERATION_ROUTER_PREFIX + "connection.clean.ms";
public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(10);

// HDFS Router State Store connection
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
Expand Down
Expand Up @@ -23,31 +23,31 @@
* Represents information about a single nameservice/namespace in a federated
* HDFS cluster.
*/
public class FederationNamespaceInfo
implements Comparable<FederationNamespaceInfo>, RemoteLocationContext {
public class FederationNamespaceInfo extends RemoteLocationContext {

/** Block pool identifier. */
private String blockPoolId;
private final String blockPoolId;
/** Cluster identifier. */
private String clusterId;
private final String clusterId;
/** Nameservice identifier. */
private String nameserviceId;
private final String nameserviceId;

public FederationNamespaceInfo(String bpId, String clId, String nsId) {
this.blockPoolId = bpId;
this.clusterId = clId;
this.nameserviceId = nsId;
}

/**
* The HDFS nameservice id for this namespace.
*
* @return Nameservice identifier.
*/
@Override
public String getNameserviceId() {
return this.nameserviceId;
}

@Override
public String getDest() {
return this.nameserviceId;
}

/**
* The HDFS cluster id for this namespace.
*
Expand All @@ -66,34 +66,8 @@ public String getBlockPoolId() {
return this.blockPoolId;
}

@Override
public int hashCode() {
return this.nameserviceId.hashCode();
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
} else if (obj instanceof FederationNamespaceInfo) {
return this.compareTo((FederationNamespaceInfo) obj) == 0;
} else {
return false;
}
}

@Override
public int compareTo(FederationNamespaceInfo info) {
return this.nameserviceId.compareTo(info.getNameserviceId());
}

@Override
public String toString() {
return this.nameserviceId + "->" + this.blockPoolId + ":" + this.clusterId;
}

@Override
public String getDest() {
return this.nameserviceId;
}
}
Expand Up @@ -17,34 +17,51 @@
*/
package org.apache.hadoop.hdfs.server.federation.resolver;

import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;

/**
* A single in a remote namespace consisting of a nameservice ID
* and a HDFS path.
*/
public class RemoteLocation implements RemoteLocationContext {
public class RemoteLocation extends RemoteLocationContext {

/** Identifier of the remote namespace for this location. */
private String nameserviceId;
private final String nameserviceId;
/** Identifier of the namenode in the namespace for this location. */
private final String namenodeId;
/** Path in the remote location. */
private String path;
private final String path;

/**
* Create a new remote location.
*
* @param nsId
* @param pPath
*/
public RemoteLocation(String nsId, String pPath) {
this(nsId, null, pPath);
}

/**
* Create a new remote location pointing to a particular namenode in the
* namespace.
*
* @param nsId Destination namespace.
* @param pPath Path in the destination namespace.
*/
public RemoteLocation(String nsId, String pPath) {
public RemoteLocation(String nsId, String nnId, String pPath) {
this.nameserviceId = nsId;
this.namenodeId = nnId;
this.path = pPath;
}

@Override
public String getNameserviceId() {
return this.nameserviceId;
String ret = this.nameserviceId;
if (this.namenodeId != null) {
ret += "-" + this.namenodeId;
}
return ret;
}

@Override
Expand All @@ -54,21 +71,6 @@ public String getDest() {

@Override
public String toString() {
return this.nameserviceId + "->" + this.path;
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 31)
.append(this.nameserviceId)
.append(this.path)
.toHashCode();
}

@Override
public boolean equals(Object obj) {
return (obj != null &&
obj.getClass() == this.getClass() &&
obj.hashCode() == this.hashCode());
return getNameserviceId() + "->" + this.path;
}
}
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;

import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.ipc.RPC;

/**
* Context to track a connection in a {@link ConnectionPool}. When a client uses
* a connection, it increments a counter to mark it as active. Once the client
* is done with the connection, it decreases the counter. It also takes care of
* closing the connection once is not active.
*/
public class ConnectionContext {

/** Client for the connection. */
private final ProxyAndInfo<ClientProtocol> client;
/** How many threads are using this connection. */
private int numThreads = 0;
/** If the connection is closed. */
private boolean closed = false;


public ConnectionContext(ProxyAndInfo<ClientProtocol> connection) {
this.client = connection;
}

/**
* Check if the connection is active.
*
* @return True if the connection is active.
*/
public synchronized boolean isActive() {
return this.numThreads > 0;
}

/**
* Check if the connection is closed.
*
* @return If the connection is closed.
*/
public synchronized boolean isClosed() {
return this.closed;
}

/**
* Check if the connection can be used. It checks if the connection is used by
* another thread or already closed.
*
* @return True if the connection can be used.
*/
public synchronized boolean isUsable() {
return !isActive() && !isClosed();
}

/**
* Get the connection client.
*
* @return Connection client.
*/
public synchronized ProxyAndInfo<ClientProtocol> getClient() {
this.numThreads++;
return this.client;
}

/**
* Release this connection. If the connection was closed, close the proxy.
* Otherwise, mark the connection as not used by us anymore.
*/
public synchronized void release() {
if (--this.numThreads == 0 && this.closed) {
close();
}
}

/**
* We will not use this connection anymore. If it's not being used, we close
* it. Otherwise, we let release() do it once we are done with it.
*/
public synchronized void close() {
this.closed = true;
if (this.numThreads == 0) {
ClientProtocol proxy = this.client.getProxy();
// Nobody should be using this anymore so it should close right away
RPC.stopProxy(proxy);
}
}
}

0 comments on commit ca4f209

Please sign in to comment.