Skip to content

Commit

Permalink
HDFS-11184. Ozone: SCM: Make SCM use container protocol. Contributed …
Browse files Browse the repository at this point in the history
…by Anu Engineer.
  • Loading branch information
anuengineer authored and omalley committed Apr 25, 2018
1 parent c169dd1 commit bb410de
Show file tree
Hide file tree
Showing 33 changed files with 1,037 additions and 920 deletions.
Expand Up @@ -63,7 +63,6 @@ public static Pipeline getFromProtoBuf(ContainerProtos.Pipeline pipeline) {
return newPipeline;
}

/** Adds a member to pipeline */

/**
* Adds a member to the pipeline.
Expand Down
Expand Up @@ -27,19 +27,30 @@

import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.GetKeyRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.GetKeyResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.PutKeyRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ReadChunkRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ReadChunkResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutSmallFileRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetSmallFileResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetSmallFileRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.PutSmallFileRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.GetSmallFileResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.GetSmallFileRequestProto;
import org.apache.hadoop.scm.XceiverClient;

/**
Expand Down Expand Up @@ -210,6 +221,33 @@ public static void writeSmallFile(XceiverClient client, String containerName,
validateContainerResponse(response, traceID);
}

/**
* createContainer call that creates a container on the datanode.
* @param client - client
* @param traceID - traceID
* @throws IOException
*/
public static void createContainer(XceiverClient client, String traceID)
throws IOException {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto
.newBuilder();
ContainerProtos.ContainerData.Builder containerData = ContainerProtos
.ContainerData.newBuilder();
containerData.setName(client.getPipeline().getContainerName());
createRequest.setPipeline(client.getPipeline().getProtobufMessage());
createRequest.setContainerData(containerData.build());

ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CreateContainer);
request.setCreateContainer(createRequest);
request.setTraceID(traceID);
ContainerCommandResponseProto response = client.sendCommand(
request.build());
validateContainerResponse(response, traceID);
}

/**
* Reads the data given the container name and key.
*
Expand Down
Expand Up @@ -321,9 +321,9 @@ public static long getScmheartbeatCheckerInterval(Configuration conf) {
* @param conf - Ozone Config
* @return - HB interval in seconds.
*/
public static int getScmHeartbeatInterval(Configuration conf) {
return conf.getInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT);
public static long getScmHeartbeatInterval(Configuration conf) {
return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT, TimeUnit.SECONDS);
}

/**
Expand Down
Expand Up @@ -94,11 +94,6 @@ public final class OzoneConfigKeys {
public static final long OZONE_SCM_STALENODE_INTERVAL_DEFAULT =
OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 3L;

public static final String OZONE_SCM_CONTAINER_THREADS =
"ozone.scm.container.threads";
public static final int OZONE_SCM_CONTAINER_THREADS_DEFAULT =
Runtime.getRuntime().availableProcessors() * 2;

public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT =
"ozone.scm.heartbeat.rpc-timeout";
public static final long OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT =
Expand Down Expand Up @@ -142,6 +137,9 @@ public final class OzoneConfigKeys {
public static final String OZONE_SCM_DATANODE_ID = "ozone.scm.datanode.id";


public static final String OZONE_SCM_DB_CACHE_SIZE_MB =
"ozone.scm.db.cache.size.mb";
public static final int OZONE_SCM_DB_CACHE_SIZE_DEFAULT = 128;

/**
* There is no need to instantiate this class.
Expand Down
Expand Up @@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Expand All @@ -20,7 +20,6 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
Expand All @@ -29,10 +28,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* State Machine Class.
Expand All @@ -55,14 +52,13 @@ public class DatanodeStateMachine implements Closeable {
*/
public DatanodeStateMachine(Configuration conf) throws IOException {
this.conf = conf;
executorService = HadoopExecutors.newScheduledThreadPool(
this.conf.getInt(OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS,
OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT),
new ThreadFactoryBuilder().setDaemon(true)
executorService = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Datanode State Machine Thread - %d").build());
connectionManager = new SCMConnectionManager(conf);
context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
heartbeatFrequency = OzoneClientUtils.getScmHeartbeatInterval(conf);
heartbeatFrequency = TimeUnit.SECONDS.toMillis(
OzoneClientUtils.getScmHeartbeatInterval(conf));
container = new OzoneContainer(conf);
}

Expand All @@ -84,15 +80,16 @@ public void start() throws IOException {
container.start();
while (context.getState() != DatanodeStates.SHUTDOWN) {
try {
LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
nextHB = Time.monotonicNow() + heartbeatFrequency;
context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS);
now = Time.monotonicNow();
if (now < nextHB) {
Thread.sleep(nextHB - now);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Unable to finish the execution", e);
} catch (Exception e) {
LOG.error("Unable to finish the execution.", e);
}
}
}
Expand Down
Expand Up @@ -187,5 +187,13 @@ public void addCommand(SCMCommand command) {
}
}

/**
* Returns the count of the Execution.
* @return long
*/
public long getExecutionCount() {
return stateExecutionCount.get();
}


}
Expand Up @@ -22,22 +22,15 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.endpoint
.HeartbeatEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint
.RegisterEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint
.VersionEndpointTask;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -109,7 +102,7 @@ private DatanodeID createDatanodeID() throws UnknownHostException {
DatanodeID temp = new DatanodeID(
//TODO : Replace this with proper network and kerberos
// support code.
InetAddress.getLocalHost().getHostAddress().toString(),
InetAddress.getLocalHost().getHostAddress(),
DataNode.getHostName(conf),
UUID.randomUUID().toString(),
0, /** XferPort - SCM does not use this port */
Expand All @@ -134,6 +127,13 @@ private DatanodeID createDatanodeID() throws UnknownHostException {
private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
createNewContainerID(Path idPath)
throws IOException {

if(!idPath.getParent().toFile().exists() &&
!idPath.getParent().toFile().mkdirs()) {
LOG.error("Failed to create container ID locations. Path: {}",
idPath.getParent());
throw new IOException("Unable to create container ID directories.");
}
StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
containerIDProto = StorageContainerDatanodeProtocolProtos
.ContainerNodeIDProto.newBuilder()
Expand Down Expand Up @@ -213,7 +213,8 @@ public void execute(ExecutorService executor) {
ecs.submit(endpointTask);
}
}

//TODO : Cache some of these tasks instead of creating them
//all the time.
private Callable<EndpointStateMachine.EndPointStates>
getEndPointTask(EndpointStateMachine endpoint) {
switch (endpoint.getState()) {
Expand Down
Expand Up @@ -49,7 +49,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
rpcEndPoint.lock();
try{
SCMVersionResponseProto versionResponse =
rpcEndPoint.getEndPoint().getVersion();
rpcEndPoint.getEndPoint().getVersion(null);
rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse));

EndpointStateMachine.EndPointStates nextState =
Expand Down
Expand Up @@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Expand All @@ -18,6 +18,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
Expand All @@ -34,7 +35,8 @@ public interface StorageContainerDatanodeProtocol {
* Returns SCM version.
* @return Version info.
*/
SCMVersionResponseProto getVersion() throws IOException;
SCMVersionResponseProto getVersion(SCMVersionRequestProto versionRequest)
throws IOException;

/**
* Used by data node to send a Heartbeat.
Expand Down
Expand Up @@ -44,8 +44,8 @@ public Type getType() {
* @return A protobuf message.
*/
@Override
public NullCmdResponseProto getProtoBufMessage() {
return NullCmdResponseProto.newBuilder().build();
public byte[] getProtoBufMessage() {
return NullCmdResponseProto.newBuilder().build().toByteArray();
}

/**
Expand Down
Expand Up @@ -57,7 +57,7 @@ public static Builder newBuilder() {
* @return Type
*/
@Override
Type getType() {
public Type getType() {
return Type.registeredCommand;
}

Expand Down Expand Up @@ -94,12 +94,12 @@ public ErrorCode getError() {
* @return A protobuf message.
*/
@Override
SCMRegisteredCmdResponseProto getProtoBufMessage() {
public byte[] getProtoBufMessage() {
return SCMRegisteredCmdResponseProto.newBuilder()
.setClusterID(this.clusterID)
.setDatanodeUUID(this.datanodeUUID)
.setErrorCode(this.error)
.build();
.build().toByteArray();
}

/**
Expand Down
Expand Up @@ -31,11 +31,11 @@ public abstract class SCMCommand<T extends GeneratedMessage> {
* Returns the type of this command.
* @return Type
*/
abstract Type getType();
public abstract Type getType();

/**
* Gets the protobuf message of this object.
* @return A protobuf message.
*/
abstract T getProtoBufMessage();
public abstract byte[] getProtoBufMessage();
}
Expand Up @@ -92,11 +92,12 @@ public Object getUnderlyingProxyObject() {
/**
* Returns SCM version.
*
* @param unused - set to null and unused.
* @return Version info.
*/
@Override
public SCMVersionResponseProto getVersion() throws IOException {

public SCMVersionResponseProto getVersion(SCMVersionRequestProto
unused) throws IOException {
SCMVersionRequestProto request =
SCMVersionRequestProto.newBuilder().build();
final SCMVersionResponseProto response;
Expand Down
Expand Up @@ -47,7 +47,7 @@ public StorageContainerDatanodeProtocolServerSideTranslatorPB(
StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request)
throws ServiceException {
try {
return impl.getVersion();
return impl.getVersion(request);
} catch (IOException e) {
throw new ServiceException(e);
}
Expand Down

0 comments on commit bb410de

Please sign in to comment.