Skip to content

Commit

Permalink
HDDS-1. Remove SCM Block DB. Contributed by Xiaoyu Yao.
Browse files Browse the repository at this point in the history
  • Loading branch information
anuengineer committed May 7, 2018
1 parent a3a1552 commit 3a43ac2
Show file tree
Hide file tree
Showing 127 changed files with 2,059 additions and 2,402 deletions.
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.util;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.TimeZone;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand All @@ -34,6 +36,8 @@ public final class Time {
*/
private static final long NANOSECONDS_PER_MILLISECOND = 1000000;

private static final TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC");

private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT =
new ThreadLocal<SimpleDateFormat>() {
@Override
Expand Down Expand Up @@ -82,4 +86,12 @@ public static long monotonicNowNanos() {
public static String formatTime(long millis) {
return DATE_FORMAT.get().format(millis);
}

/**
* Get the current UTC time in milliseconds.
* @return the current UTC time in milliseconds.
*/
public static long getUtcTime() {
return Calendar.getInstance(UTC_ZONE).getTimeInMillis();
}
}
Expand Up @@ -60,7 +60,7 @@ public class XceiverClientManager implements Closeable {

//TODO : change this to SCM configuration class
private final Configuration conf;
private final Cache<String, XceiverClientSpi> clientCache;
private final Cache<Long, XceiverClientSpi> clientCache;
private final boolean useRatis;

private static XceiverClientMetrics metrics;
Expand All @@ -84,10 +84,10 @@ public XceiverClientManager(Configuration conf) {
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
.maximumSize(maxSize)
.removalListener(
new RemovalListener<String, XceiverClientSpi>() {
new RemovalListener<Long, XceiverClientSpi>() {
@Override
public void onRemoval(
RemovalNotification<String, XceiverClientSpi>
RemovalNotification<Long, XceiverClientSpi>
removalNotification) {
synchronized (clientCache) {
// Mark the entry as evicted
Expand All @@ -99,7 +99,7 @@ public void onRemoval(
}

@VisibleForTesting
public Cache<String, XceiverClientSpi> getClientCache() {
public Cache<Long, XceiverClientSpi> getClientCache() {
return clientCache;
}

Expand All @@ -114,14 +114,14 @@ public Cache<String, XceiverClientSpi> getClientCache() {
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
public XceiverClientSpi acquireClient(Pipeline pipeline)
public XceiverClientSpi acquireClient(Pipeline pipeline, long containerID)
throws IOException {
Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getMachines() != null);
Preconditions.checkArgument(!pipeline.getMachines().isEmpty());

synchronized (clientCache) {
XceiverClientSpi info = getClient(pipeline);
XceiverClientSpi info = getClient(pipeline, containerID);
info.incrementReference();
return info;
}
Expand All @@ -139,11 +139,10 @@ public void releaseClient(XceiverClientSpi client) {
}
}

private XceiverClientSpi getClient(Pipeline pipeline)
private XceiverClientSpi getClient(Pipeline pipeline, long containerID)
throws IOException {
String containerName = pipeline.getContainerName();
try {
return clientCache.get(containerName,
return clientCache.get(containerID,
new Callable<XceiverClientSpi>() {
@Override
public XceiverClientSpi call() throws Exception {
Expand Down
Expand Up @@ -86,15 +86,16 @@ public static void setContainerSizeB(long size) {
* @inheritDoc
*/
@Override
public Pipeline createContainer(String containerId, String owner)
public ContainerInfo createContainer(String owner)
throws IOException {
XceiverClientSpi client = null;
try {
Pipeline pipeline =
ContainerInfo container =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerId, owner);
client = xceiverClientManager.acquireClient(pipeline);
xceiverClientManager.getFactor(), owner);
Pipeline pipeline = container.getPipeline();
client = xceiverClientManager.acquireClient(pipeline, container.getContainerID());

// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
Expand All @@ -104,10 +105,8 @@ public Pipeline createContainer(String containerId, String owner)
if (pipeline.getLifeCycleState() == ALLOCATED) {
createPipeline(client, pipeline);
}
// TODO : Container Client State needs to be updated.
// TODO : Return ContainerInfo instead of Pipeline
createContainer(containerId, client, pipeline);
return pipeline;
createContainer(client, container.getContainerID());
return container;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
Expand All @@ -118,20 +117,19 @@ public Pipeline createContainer(String containerId, String owner)
/**
* Create a container over pipeline specified by the SCM.
*
* @param containerId - Container ID
* @param client - Client to communicate with Datanodes
* @param pipeline - A pipeline that is already created.
* @param client - Client to communicate with Datanodes.
* @param containerId - Container ID.
* @throws IOException
*/
public void createContainer(String containerId, XceiverClientSpi client,
Pipeline pipeline) throws IOException {
public void createContainer(XceiverClientSpi client,
long containerId) throws IOException {
String traceID = UUID.randomUUID().toString();
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.begin);
ContainerProtocolCalls.createContainer(client, traceID);
ContainerProtocolCalls.createContainer(client, containerId, traceID);
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
Expand All @@ -142,8 +140,8 @@ public void createContainer(String containerId, XceiverClientSpi client,
// creation state.
if (LOG.isDebugEnabled()) {
LOG.debug("Created container " + containerId
+ " leader:" + pipeline.getLeader()
+ " machines:" + pipeline.getMachines());
+ " leader:" + client.getPipeline().getLeader()
+ " machines:" + client.getPipeline().getMachines());
}
}

Expand All @@ -168,20 +166,25 @@ private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
// 2. Talk to Datanodes to create the pipeline.
//
// 3. update SCM that pipeline creation was successful.
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.pipeline,
pipeline.getPipelineName(),
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.begin);

// TODO: this has not been fully implemented on server side
// SCMClientProtocolServer#notifyObjectStageChange
// TODO: when implement the pipeline state machine, change
// the pipeline name (string) to pipeline id (long)
//storageContainerLocationClient.notifyObjectStageChange(
// ObjectStageChangeRequestProto.Type.pipeline,
// pipeline.getPipelineName(),
// ObjectStageChangeRequestProto.Op.create,
// ObjectStageChangeRequestProto.Stage.begin);

client.createPipeline(pipeline.getPipelineName(),
pipeline.getMachines());

storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.pipeline,
pipeline.getPipelineName(),
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.complete);
//storageContainerLocationClient.notifyObjectStageChange(
// ObjectStageChangeRequestProto.Type.pipeline,
// pipeline.getPipelineName(),
// ObjectStageChangeRequestProto.Op.create,
// ObjectStageChangeRequestProto.Stage.complete);

// TODO : Should we change the state on the client side ??
// That makes sense, but it is not needed for the client to work.
Expand All @@ -193,29 +196,29 @@ private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
* @inheritDoc
*/
@Override
public Pipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor,
String containerId, String owner) throws IOException {
public ContainerInfo createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, String owner) throws IOException {
XceiverClientSpi client = null;
try {
// allocate container on SCM.
Pipeline pipeline =
ContainerInfo container =
storageContainerLocationClient.allocateContainer(type, factor,
containerId, owner);
client = xceiverClientManager.acquireClient(pipeline);
owner);
Pipeline pipeline = container.getPipeline();
client = xceiverClientManager.acquireClient(pipeline,
container.getContainerID());

// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
// which was choosen by the SCM.
if (pipeline.getLifeCycleState() == ALLOCATED) {
createPipeline(client, pipeline);
}

// TODO : Return ContainerInfo instead of Pipeline
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline);
createContainer(containerId, client, pipeline);
return pipeline;
client = xceiverClientManager.acquireClient(pipeline,
container.getContainerID());
createContainer(client, container.getContainerID());
return container;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
Expand Down Expand Up @@ -258,18 +261,18 @@ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
* @throws IOException
*/
@Override
public void deleteContainer(Pipeline pipeline, boolean force)
public void deleteContainer(long containerID, Pipeline pipeline, boolean force)
throws IOException {
XceiverClientSpi client = null;
try {
client = xceiverClientManager.acquireClient(pipeline);
client = xceiverClientManager.acquireClient(pipeline, containerID);
String traceID = UUID.randomUUID().toString();
ContainerProtocolCalls.deleteContainer(client, force, traceID);
ContainerProtocolCalls.deleteContainer(client, containerID, force, traceID);
storageContainerLocationClient
.deleteContainer(pipeline.getContainerName());
.deleteContainer(containerID);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted container {}, leader: {}, machines: {} ",
pipeline.getContainerName(),
containerID,
pipeline.getLeader(),
pipeline.getMachines());
}
Expand All @@ -284,11 +287,10 @@ public void deleteContainer(Pipeline pipeline, boolean force)
* {@inheritDoc}
*/
@Override
public List<ContainerInfo> listContainer(String startName,
String prefixName, int count)
throws IOException {
public List<ContainerInfo> listContainer(long startContainerID,
int count) throws IOException {
return storageContainerLocationClient.listContainer(
startName, prefixName, count);
startContainerID, count);
}

/**
Expand All @@ -300,17 +302,17 @@ public List<ContainerInfo> listContainer(String startName,
* @throws IOException
*/
@Override
public ContainerData readContainer(Pipeline pipeline) throws IOException {
public ContainerData readContainer(long containerID,
Pipeline pipeline) throws IOException {
XceiverClientSpi client = null;
try {
client = xceiverClientManager.acquireClient(pipeline);
client = xceiverClientManager.acquireClient(pipeline, containerID);
String traceID = UUID.randomUUID().toString();
ReadContainerResponseProto response =
ContainerProtocolCalls.readContainer(client,
pipeline.getContainerName(), traceID);
ContainerProtocolCalls.readContainer(client, containerID, traceID);
if (LOG.isDebugEnabled()) {
LOG.debug("Read container {}, leader: {}, machines: {} ",
pipeline.getContainerName(),
containerID,
pipeline.getLeader(),
pipeline.getMachines());
}
Expand All @@ -329,7 +331,7 @@ public ContainerData readContainer(Pipeline pipeline) throws IOException {
* @throws IOException
*/
@Override
public Pipeline getContainer(String containerId) throws
public ContainerInfo getContainer(long containerId) throws
IOException {
return storageContainerLocationClient.getContainer(containerId);
}
Expand All @@ -341,7 +343,8 @@ public Pipeline getContainer(String containerId) throws
* @throws IOException
*/
@Override
public void closeContainer(Pipeline pipeline) throws IOException {
public void closeContainer(long containerId, Pipeline pipeline)
throws IOException {
XceiverClientSpi client = null;
try {
LOG.debug("Close container {}", pipeline);
Expand All @@ -364,18 +367,16 @@ public void closeContainer(Pipeline pipeline) throws IOException {
For now, take the #2 way.
*/
// Actually close the container on Datanode
client = xceiverClientManager.acquireClient(pipeline);
client = xceiverClientManager.acquireClient(pipeline, containerId);
String traceID = UUID.randomUUID().toString();

String containerId = pipeline.getContainerName();

storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.begin);

ContainerProtocolCalls.closeContainer(client, traceID);
ContainerProtocolCalls.closeContainer(client, containerId, traceID);
// Notify SCM to close the container
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
Expand All @@ -391,13 +392,13 @@ public void closeContainer(Pipeline pipeline) throws IOException {

/**
* Get the the current usage information.
* @param pipeline - Pipeline
* @param containerID - ID of the container.
* @return the size of the given container.
* @throws IOException
*/
@Override
public long getContainerSize(Pipeline pipeline) throws IOException {
// TODO : Pipeline can be null, handle it correctly.
public long getContainerSize(long containerID) throws IOException {
// TODO : Fix this, it currently returns the capacity but not the current usage.
long size = getContainerSizeB();
if (size == -1) {
throw new IOException("Container size unknown!");
Expand Down

0 comments on commit 3a43ac2

Please sign in to comment.