Skip to content

Commit

Permalink
HDDS-429. StorageContainerManager lock optimization.
Browse files Browse the repository at this point in the history
Contributed by Nanda Kumar.
  • Loading branch information
anuengineer committed Sep 14, 2018
1 parent 144a55f commit 0c8a43b
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 180 deletions.
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.hadoop.hdds.scm.block;

import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
Expand Down Expand Up @@ -45,8 +44,8 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.CHILL_MODE_EXCEPTION;
Expand All @@ -72,7 +71,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
private final NodeManager nodeManager;
private final Mapping containerManager;

private final Lock lock;
private final ReadWriteLock lock;
private final long containerSize;

private final DeletedBlockLog deletedBlockLog;
Expand Down Expand Up @@ -108,7 +107,7 @@ public BlockManagerImpl(final Configuration conf,
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
rand = new Random();
this.lock = new ReentrantLock();
this.lock = new ReentrantReadWriteLock();

mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);

Expand Down Expand Up @@ -155,29 +154,22 @@ public void stop() throws IOException {
* @param factor - how many copies needed for this container.
* @throws IOException
*/
private void preAllocateContainers(int count, ReplicationType type,
ReplicationFactor factor, String owner)
private synchronized void preAllocateContainers(int count,
ReplicationType type, ReplicationFactor factor, String owner)
throws IOException {
lock.lock();
try {
for (int i = 0; i < count; i++) {
ContainerWithPipeline containerWithPipeline;
try {
// TODO: Fix this later when Ratis is made the Default.
containerWithPipeline = containerManager.allocateContainer(
type, factor, owner);
for (int i = 0; i < count; i++) {
ContainerWithPipeline containerWithPipeline;
try {
// TODO: Fix this later when Ratis is made the Default.
containerWithPipeline = containerManager.allocateContainer(
type, factor, owner);

if (containerWithPipeline == null) {
LOG.warn("Unable to allocate container.");
continue;
}
} catch (IOException ex) {
LOG.warn("Unable to allocate container: {}", ex);
continue;
if (containerWithPipeline == null) {
LOG.warn("Unable to allocate container.");
}
} catch (IOException ex) {
LOG.warn("Unable to allocate container: {}", ex);
}
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -208,46 +200,61 @@ public AllocatedBlock allocateBlock(final long size,
CHILL_MODE_EXCEPTION);
}

lock.lock();
try {
/*
Here is the high level logic.
1. First we check if there are containers in ALLOCATED state,
that is
SCM has allocated them in the SCM namespace but the
corresponding
container has not been created in the Datanode yet. If we
have any
in that state, we will return that to the client, which allows
client to finish creating those containers. This is a sort of
greedy
algorithm, our primary purpose is to get as many containers as
possible.
2. If there are no allocated containers -- Then we find a Open
container that matches that pattern.
3. If both of them fail, the we will pre-allocate a bunch of
conatainers in SCM and try again.
TODO : Support random picking of two containers from the list.
So we
can use different kind of policies.
*/
/*
Here is the high level logic.
ContainerWithPipeline containerWithPipeline;
1. First we check if there are containers in ALLOCATED state, that is
SCM has allocated them in the SCM namespace but the corresponding
container has not been created in the Datanode yet. If we have any in
that state, we will return that to the client, which allows client to
finish creating those containers. This is a sort of greedy algorithm,
our primary purpose is to get as many containers as possible.
// Look for ALLOCATED container that matches all other parameters.
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.ALLOCATED);
if (containerWithPipeline != null) {
containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED);
2. If there are no allocated containers -- Then we find a Open container
that matches that pattern.
3. If both of them fail, the we will pre-allocate a bunch of containers
in SCM and try again.
TODO : Support random picking of two containers from the list. So we can
use different kind of policies.
*/

ContainerWithPipeline containerWithPipeline;

lock.readLock().lock();
try {
// This is to optimize performance, if the below condition is evaluated
// to false, then we can be sure that there are no containers in
// ALLOCATED state.
// This can result in false positive, but it will never be false negative.
// How can this result in false positive? We check if there are any
// containers in ALLOCATED state, this check doesn't care about the
// USER of the containers. So there might be cases where a different
// USER has few containers in ALLOCATED state, which will result in
// false positive.
if (!containerManager.getStateManager().getContainerStateMap()
.getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED)
.isEmpty()) {
// Since the above check can result in false positive, we have to do
// the actual check and find out if there are containers in ALLOCATED
// state matching our criteria.
synchronized (this) {
// Using containers from ALLOCATED state should be done within
// synchronized block (or) write lock. Since we already hold a
// read lock, we will end up in deadlock situation if we take
// write lock here.
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.ALLOCATED);
if (containerWithPipeline != null) {
containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED);
}
}
}

// Since we found no allocated containers that match our criteria, let us
Expand All @@ -263,20 +270,34 @@ public AllocatedBlock allocateBlock(final long size,
// that most of our containers are full or we have not allocated
// containers of the type and replication factor. So let us go and
// allocate some.
preAllocateContainers(containerProvisionBatchSize, type, factor, owner);

// Since we just allocated a set of containers this should work
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
// Even though we have already checked the containers in ALLOCATED
// state, we have to check again as we only hold a read lock.
// Some other thread might have pre-allocated container in meantime.
synchronized (this) {
if (!containerManager.getStateManager().getContainerStateMap()
.getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED)
.isEmpty()) {
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.ALLOCATED);
}
if (containerWithPipeline == null) {
preAllocateContainers(containerProvisionBatchSize,
type, factor, owner);
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.ALLOCATED);
}

if (containerWithPipeline != null) {
containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED);
if (containerWithPipeline != null) {
containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED);
}
}

// we have tried all strategies we know and but somehow we are not able
// to get a container for this block. Log that info and return a null.
LOG.error(
Expand All @@ -286,19 +307,9 @@ public AllocatedBlock allocateBlock(final long size,
type,
factor);
return null;
} finally {
lock.unlock();
}
}

private String getChannelName(ReplicationType type) {
switch (type) {
case RATIS:
return "RA" + UUID.randomUUID().toString().substring(3);
case STAND_ALONE:
return "SA" + UUID.randomUUID().toString().substring(3);
default:
return "RA" + UUID.randomUUID().toString().substring(3);
} finally {
lock.readLock().unlock();
}
}

Expand Down Expand Up @@ -353,40 +364,34 @@ public void deleteBlocks(List<BlockID> blockIDs) throws IOException {
CHILL_MODE_EXCEPTION);
}

lock.lock();
LOG.info("Deleting blocks {}", StringUtils.join(",", blockIDs));
Map<Long, List<Long>> containerBlocks = new HashMap<>();
// TODO: track the block size info so that we can reclaim the container
// TODO: used space when the block is deleted.
try {
for (BlockID block : blockIDs) {
// Merge blocks to a container to blocks mapping,
// prepare to persist this info to the deletedBlocksLog.
long containerID = block.getContainerID();
if (containerBlocks.containsKey(containerID)) {
containerBlocks.get(containerID).add(block.getLocalID());
} else {
List<Long> item = new ArrayList<>();
item.add(block.getLocalID());
containerBlocks.put(containerID, item);
}
for (BlockID block : blockIDs) {
// Merge blocks to a container to blocks mapping,
// prepare to persist this info to the deletedBlocksLog.
long containerID = block.getContainerID();
if (containerBlocks.containsKey(containerID)) {
containerBlocks.get(containerID).add(block.getLocalID());
} else {
List<Long> item = new ArrayList<>();
item.add(block.getLocalID());
containerBlocks.put(containerID, item);
}
}

try {
deletedBlockLog.addTransactions(containerBlocks);
} catch (IOException e) {
throw new IOException(
"Skip writing the deleted blocks info to"
+ " the delLog because addTransaction fails. Batch skipped: "
+ StringUtils.join(",", blockIDs),
e);
}
// TODO: Container report handling of the deleted blocks:
// Remove tombstone and update open container usage.
// We will revisit this when the closed container replication is done.
} finally {
lock.unlock();
try {
deletedBlockLog.addTransactions(containerBlocks);
} catch (IOException e) {
throw new IOException(
"Skip writing the deleted blocks info to"
+ " the delLog because addTransaction fails. Batch skipped: "
+ StringUtils.join(",", blockIDs), e);
}
// TODO: Container report handling of the deleted blocks:
// Remove tombstone and update open container usage.
// We will revisit this when the closed container replication is done.
}

@Override
Expand Down

0 comments on commit 0c8a43b

Please sign in to comment.