Skip to content

Commit

Permalink
HDDS-179. CloseContainer/PutKey command should be syncronized with wr…
Browse files Browse the repository at this point in the history
…ite operations. Contributed by Shashikant Banerjee.
  • Loading branch information
mukul1987 committed Aug 16, 2018
1 parent 0e832e7 commit 5ef2908
Show file tree
Hide file tree
Showing 2 changed files with 467 additions and 57 deletions.
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.ozone.container.common.transport.server.ratis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.ratis.protocol.RaftGroupId;
Expand Down Expand Up @@ -52,6 +53,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -95,6 +99,13 @@
* {@link #applyTransaction} need to be enforced in the StateMachine
* implementation. For example, synchronization between writeChunk and
* createContainer in {@link ContainerStateMachine}.
*
* PutKey is synchronized with WriteChunk operations, PutKey for a block is
* executed only after all the WriteChunk preceding the PutKey have finished.
*
* CloseContainer is synchronized with WriteChunk and PutKey operations,
* CloseContainer for a container is processed after all the preceding write
* operations for the container have finished.
* */
public class ContainerStateMachine extends BaseStateMachine {
static final Logger LOG = LoggerFactory.getLogger(
Expand All @@ -105,15 +116,14 @@ public class ContainerStateMachine extends BaseStateMachine {
private ThreadPoolExecutor chunkExecutor;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
createContainerFutureMap;
private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;

ContainerStateMachine(ContainerDispatcher dispatcher,
public ContainerStateMachine(ContainerDispatcher dispatcher,
ThreadPoolExecutor chunkExecutor) {
this.dispatcher = dispatcher;
this.chunkExecutor = chunkExecutor;
this.writeChunkFutureMap = new ConcurrentHashMap<>();
this.createContainerFutureMap = new ConcurrentHashMap<>();
this.stateMachineMap = new ConcurrentHashMap<>();
}

@Override
Expand Down Expand Up @@ -203,32 +213,6 @@ private Message runCommand(ContainerCommandRequestProto requestProto) {
return dispatchCommand(requestProto)::toByteString;
}

private CompletableFuture<Message> handleWriteChunk(
ContainerCommandRequestProto requestProto, long entryIndex) {
final WriteChunkRequestProto write = requestProto.getWriteChunk();
long containerID = write.getBlockID().getContainerID();
CompletableFuture<Message> future =
createContainerFutureMap.get(containerID);
CompletableFuture<Message> writeChunkFuture;
if (future != null) {
writeChunkFuture = future.thenApplyAsync(
v -> runCommand(requestProto), chunkExecutor);
} else {
writeChunkFuture = CompletableFuture.supplyAsync(
() -> runCommand(requestProto), chunkExecutor);
}
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
return writeChunkFuture;
}

private CompletableFuture<Message> handleCreateContainer(
ContainerCommandRequestProto requestProto) {
long containerID = requestProto.getContainerID();
createContainerFutureMap.
computeIfAbsent(containerID, k -> new CompletableFuture<>());
return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
}

/*
* writeStateMachineData calls are not synchronized with each other
* and also with applyTransaction.
Expand All @@ -239,15 +223,17 @@ public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData());
Type cmdType = requestProto.getCmdType();
switch (cmdType) {
case CreateContainer:
return handleCreateContainer(requestProto);
case WriteChunk:
return handleWriteChunk(requestProto, entry.getIndex());
default:
throw new IllegalStateException("Cmd Type:" + cmdType
+ " should not have state machine data");
long containerId = requestProto.getContainerID();
stateMachineMap
.computeIfAbsent(containerId, k -> new StateMachineHelper());
CompletableFuture<Message> stateMachineFuture =
stateMachineMap.get(containerId)
.handleStateMachineData(requestProto, entry.getIndex());
if (stateMachineFuture == null) {
throw new IllegalStateException(
"Cmd Type:" + cmdType + " should not have state machine data");
}
return stateMachineFuture;
} catch (IOException e) {
return completeExceptionally(e);
}
Expand Down Expand Up @@ -363,25 +349,13 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
try {
ContainerCommandRequestProto requestProto =
getRequestProto(trx.getSMLogEntry().getData());
Type cmdType = requestProto.getCmdType();

if (cmdType == Type.WriteChunk) {
WriteChunkRequestProto write = requestProto.getWriteChunk();
// the data field has already been removed in start Transaction
Preconditions.checkArgument(!write.hasData());
CompletableFuture<Message> stateMachineFuture =
writeChunkFutureMap.remove(trx.getLogEntry().getIndex());
return stateMachineFuture
.thenComposeAsync(v ->
CompletableFuture.completedFuture(runCommand(requestProto)));
} else {
Message message = runCommand(requestProto);
if (cmdType == Type.CreateContainer) {
long containerID = requestProto.getContainerID();
createContainerFutureMap.remove(containerID).complete(message);
}
return CompletableFuture.completedFuture(message);
}
Preconditions.checkState(!HddsUtils.isReadOnly(requestProto));
stateMachineMap.computeIfAbsent(requestProto.getContainerID(),
k -> new StateMachineHelper());
long index =
trx.getLogEntry() == null ? -1 : trx.getLogEntry().getIndex();
return stateMachineMap.get(requestProto.getContainerID())
.executeContainerCommand(requestProto, index);
} catch (IOException e) {
return completeExceptionally(e);
}
Expand All @@ -396,4 +370,239 @@ private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
@Override
public void close() throws IOException {
}

/**
* Class to manage the future tasks for writeChunks.
*/
static class CommitChunkFutureMap {
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
block2ChunkMap = new ConcurrentHashMap<>();

synchronized int removeAndGetSize(long index) {
block2ChunkMap.remove(index);
return block2ChunkMap.size();
}

synchronized CompletableFuture<Message> add(long index,
CompletableFuture<Message> future) {
return block2ChunkMap.put(index, future);
}

synchronized List<CompletableFuture<Message>> getAll() {
return new ArrayList<>(block2ChunkMap.values());
}
}

/**
* This class maintains maps and provide utilities to enforce synchronization
* among createContainer, writeChunk, putKey and closeContainer.
*/
private class StateMachineHelper {

private CompletableFuture<Message> createContainerFuture;

// Map for maintaining all writeChunk futures mapped to blockId
private final ConcurrentHashMap<Long, CommitChunkFutureMap>
block2ChunkMap;

// Map for putKey futures
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
blockCommitMap;

StateMachineHelper() {
createContainerFuture = null;
block2ChunkMap = new ConcurrentHashMap<>();
blockCommitMap = new ConcurrentHashMap<>();
}

// The following section handles writeStateMachineData transactions
// on a container

// enqueue the create container future during writeStateMachineData
// so that the write stateMachine data phase of writeChunk wait on
// create container to finish.
private CompletableFuture<Message> handleCreateContainer() {
createContainerFuture = new CompletableFuture<>();
return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
}

// This synchronizes on create container to finish
private CompletableFuture<Message> handleWriteChunk(
ContainerCommandRequestProto requestProto, long entryIndex) {
CompletableFuture<Message> containerOpFuture;

if (createContainerFuture != null) {
containerOpFuture = createContainerFuture
.thenApplyAsync(v -> runCommand(requestProto), chunkExecutor);
} else {
containerOpFuture = CompletableFuture
.supplyAsync(() -> runCommand(requestProto), chunkExecutor);
}
writeChunkFutureMap.put(entryIndex, containerOpFuture);
return containerOpFuture;
}

CompletableFuture<Message> handleStateMachineData(
final ContainerCommandRequestProto requestProto, long index) {
Type cmdType = requestProto.getCmdType();
if (cmdType == Type.CreateContainer) {
return handleCreateContainer();
} else if (cmdType == Type.WriteChunk) {
return handleWriteChunk(requestProto, index);
} else {
return null;
}
}

// The following section handles applyTransaction transactions
// on a container

private CompletableFuture<Message> handlePutKey(
ContainerCommandRequestProto requestProto) {
List<CompletableFuture<Message>> futureList = new ArrayList<>();
long localId =
requestProto.getPutKey().getKeyData().getBlockID().getLocalID();
// Need not wait for create container future here as it has already
// finished.
if (block2ChunkMap.get(localId) != null) {
futureList.addAll(block2ChunkMap.get(localId).getAll());
}
CompletableFuture<Message> effectiveFuture =
runCommandAfterFutures(futureList, requestProto);

CompletableFuture<Message> putKeyFuture =
effectiveFuture.thenApply(message -> {
blockCommitMap.remove(localId);
return message;
});
blockCommitMap.put(localId, putKeyFuture);
return putKeyFuture;
}

// Close Container should be executed only if all pending WriteType
// container cmds get executed. Transactions which can return a future
// are WriteChunk and PutKey.
private CompletableFuture<Message> handleCloseContainer(
ContainerCommandRequestProto requestProto) {
List<CompletableFuture<Message>> futureList = new ArrayList<>();

// No need to wait for create container future here as it should have
// already finished.
block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll()));
futureList.addAll(blockCommitMap.values());

// There are pending write Chunk/PutKey type requests
// Queue this closeContainer request behind all these requests
CompletableFuture<Message> closeContainerFuture =
runCommandAfterFutures(futureList, requestProto);

return closeContainerFuture.thenApply(message -> {
stateMachineMap.remove(requestProto.getContainerID());
return message;
});
}

private CompletableFuture<Message> handleChunkCommit(
ContainerCommandRequestProto requestProto, long index) {
WriteChunkRequestProto write = requestProto.getWriteChunk();
// the data field has already been removed in start Transaction
Preconditions.checkArgument(!write.hasData());
CompletableFuture<Message> stateMachineFuture =
writeChunkFutureMap.remove(index);
CompletableFuture<Message> commitChunkFuture = stateMachineFuture
.thenComposeAsync(v -> CompletableFuture
.completedFuture(runCommand(requestProto)));

long localId = requestProto.getWriteChunk().getBlockID().getLocalID();
// Put the applyTransaction Future again to the Map.
// closeContainer should synchronize with this.
block2ChunkMap
.computeIfAbsent(localId, id -> new CommitChunkFutureMap())
.add(index, commitChunkFuture);
return commitChunkFuture.thenApply(message -> {
block2ChunkMap.computeIfPresent(localId, (containerId, chunks)
-> chunks.removeAndGetSize(index) == 0? null: chunks);
return message;
});
}

private CompletableFuture<Message> runCommandAfterFutures(
List<CompletableFuture<Message>> futureList,
ContainerCommandRequestProto requestProto) {
CompletableFuture<Message> effectiveFuture;
if (futureList.isEmpty()) {
effectiveFuture = CompletableFuture
.supplyAsync(() -> runCommand(requestProto));

} else {
CompletableFuture<Void> allFuture = CompletableFuture.allOf(
futureList.toArray(new CompletableFuture[futureList.size()]));
effectiveFuture = allFuture
.thenApplyAsync(v -> runCommand(requestProto));
}
return effectiveFuture;
}

CompletableFuture<Message> handleCreateContainer(
ContainerCommandRequestProto requestProto) {
CompletableFuture<Message> future =
CompletableFuture.completedFuture(runCommand(requestProto));
future.thenAccept(m -> {
createContainerFuture.complete(m);
createContainerFuture = null;
});
return future;
}

CompletableFuture<Message> handleOtherCommands(
ContainerCommandRequestProto requestProto) {
return CompletableFuture.completedFuture(runCommand(requestProto));
}

CompletableFuture<Message> executeContainerCommand(
ContainerCommandRequestProto requestProto, long index) {
Type cmdType = requestProto.getCmdType();
switch (cmdType) {
case WriteChunk:
return handleChunkCommit(requestProto, index);
case CloseContainer:
return handleCloseContainer(requestProto);
case PutKey:
return handlePutKey(requestProto);
case CreateContainer:
return handleCreateContainer(requestProto);
default:
return handleOtherCommands(requestProto);
}
}
}

@VisibleForTesting
public ConcurrentHashMap<Long, StateMachineHelper> getStateMachineMap() {
return stateMachineMap;
}

@VisibleForTesting
public CompletableFuture<Message> getCreateContainerFuture(long containerId) {
StateMachineHelper helper = stateMachineMap.get(containerId);
return helper == null ? null : helper.createContainerFuture;
}

@VisibleForTesting
public List<CompletableFuture<Message>> getCommitChunkFutureMap(
long containerId) {
StateMachineHelper helper = stateMachineMap.get(containerId);
if (helper != null) {
List<CompletableFuture<Message>> futureList = new ArrayList<>();
stateMachineMap.get(containerId).block2ChunkMap.values()
.forEach(b -> futureList.addAll(b.getAll()));
return futureList;
}
return null;
}

@VisibleForTesting
public Collection<CompletableFuture<Message>> getWriteChunkFutureMap() {
return writeChunkFutureMap.values();
}
}

0 comments on commit 5ef2908

Please sign in to comment.