Skip to content

Commit

Permalink
Revert "Block Storage: volume creation times out while creating 3TB v…
Browse files Browse the repository at this point in the history
…olume because of too many containers. Contributed by Mukul Kumar Singh." to fix commit message.

This reverts commit 087c69b.
  • Loading branch information
Chen Liang authored and omalley committed Apr 26, 2018
1 parent d303b7f commit dddded0
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 212 deletions.
Expand Up @@ -172,21 +172,6 @@ public final class CBlockConfigKeys {
public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT =
64 * 1024;

/**
* Cblock CLI configs.
*/
public static final String DFS_CBLOCK_MANAGER_POOL_SIZE =
"dfs.cblock.manager.pool.size";
public static final int DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT = 16;

/**
* currently the largest supported volume is about 8TB, which might take
* > 20 seconds to finish creating containers. thus set timeout to 30 sec.
*/
public static final String DFS_CBLOCK_RPC_TIMEOUT_SECONDS =
"dfs.cblock.rpc.timeout.seconds";
public static final int DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT = 300;

private CBlockConfigKeys() {

}
Expand Down
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.cblock.client;

import org.apache.hadoop.cblock.CBlockConfigKeys;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.io.retry.RetryPolicies;
Expand All @@ -37,25 +36,32 @@
*/
public class CBlockVolumeClient {
private final CBlockServiceProtocolClientSideTranslatorPB cblockClient;
private final OzoneConfiguration conf;

public CBlockVolumeClient(OzoneConfiguration conf) throws IOException {
this(conf, null);
this.conf = conf;
long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
InetSocketAddress address = OzoneClientUtils.getCblockServiceRpcAddr(conf);
// currently the largest supported volume is about 8TB, which might take
// > 20 seconds to finish creating containers. thus set timeout to 30 sec.
cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies
.retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit
.SECONDS)).getProxy());
}

public CBlockVolumeClient(OzoneConfiguration conf,
InetSocketAddress serverAddress) throws IOException {
InetSocketAddress address = serverAddress != null ? serverAddress :
OzoneClientUtils.getCblockServiceRpcAddr(conf);
this.conf = conf;
long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
int rpcTimeout =
conf.getInt(CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS,
CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT) * 1000;
cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), rpcTimeout, RetryPolicies
.retryUpToMaximumCountWithFixedSleep(
300, 1, TimeUnit.SECONDS)).getProxy());
serverAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies
.retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit
.SECONDS)).getProxy());
}

public void createVolume(String userName, String volumeName,
Expand Down
Expand Up @@ -77,7 +77,6 @@ public void run() {
String containerName = null;
XceiverClientSpi client = null;
LevelDBStore levelDBStore = null;
String traceID = flusher.getTraceID(new File(dbPath), block.getBlockID());
flusher.getLOG().debug(
"Writing block to remote. block ID: {}", block.getBlockID());
try {
Expand All @@ -95,7 +94,8 @@ public void run() {
Preconditions.checkState(data.length > 0, "Block data is zero length");
startTime = Time.monotonicNow();
ContainerProtocolCalls.writeSmallFile(client, containerName,
Long.toString(block.getBlockID()), data, traceID);
Long.toString(block.getBlockID()), data,
flusher.getTraceID(new File(dbPath), block.getBlockID()));
endTime = Time.monotonicNow();
flusher.getTargetMetrics().updateContainerWriteLatency(
endTime - startTime);
Expand All @@ -107,7 +107,7 @@ public void run() {
} catch (Exception ex) {
flusher.getLOG().error("Writing of block:{} failed, We have attempted " +
"to write this block {} times to the container {}.Trace ID:{}",
block.getBlockID(), this.getTryCount(), containerName, traceID, ex);
block.getBlockID(), this.getTryCount(), containerName, "", ex);
writeRetryBlock(block);
if (ex instanceof IOException) {
flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks();
Expand Down
Expand Up @@ -151,7 +151,6 @@ public long getLocalIOCount() {
*/
public void writeBlock(LogicalBlock block) throws IOException {
byte[] keybuf = Longs.toByteArray(block.getBlockID());
String traceID = parentCache.getTraceID(block.getBlockID());
if (parentCache.isShortCircuitIOEnabled()) {
long startTime = Time.monotonicNow();
getCacheDB().put(keybuf, block.getData().array());
Expand All @@ -177,7 +176,7 @@ public void writeBlock(LogicalBlock block) throws IOException {
.acquireClient(parentCache.getPipeline(block.getBlockID()));
ContainerProtocolCalls.writeSmallFile(client, containerName,
Long.toString(block.getBlockID()), block.getData().array(),
traceID);
parentCache.getTraceID(block.getBlockID()));
long endTime = Time.monotonicNow();
if (parentCache.isTraceEnabled()) {
String datahash = DigestUtils.sha256Hex(block.getData().array());
Expand All @@ -190,9 +189,8 @@ public void writeBlock(LogicalBlock block) throws IOException {
parentCache.getTargetMetrics().incNumDirectBlockWrites();
} catch (Exception ex) {
parentCache.getTargetMetrics().incNumFailedDirectBlockWrites();
LOG.error("Direct I/O writing of block:{} traceID:{} to "
+ "container {} failed", block.getBlockID(), traceID,
containerName, ex);
LOG.error("Direct I/O writing of block:{} to container {} failed",
block.getBlockID(), containerName, ex);
throw ex;
} finally {
if (client != null) {
Expand Down
Expand Up @@ -29,7 +29,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* The internal representation maintained by CBlock server as the info for
Expand All @@ -54,7 +53,7 @@ public class VolumeDescriptor {
private static final Logger LOG =
LoggerFactory.getLogger(VolumeDescriptor.class);

private ConcurrentHashMap<String, ContainerDescriptor> containerMap;
private HashMap<String, ContainerDescriptor> containerMap;
private String userName;
private int blockSize;
private long volumeSize;
Expand All @@ -73,12 +72,13 @@ public class VolumeDescriptor {
* and set*() methods are for the same purpose also.
*/
public VolumeDescriptor() {
this(null, null, 0, 0);
containerMap = new HashMap<>();
containerIdOrdered = new ArrayList<>();
}

public VolumeDescriptor(String userName, String volumeName, long volumeSize,
int blockSize) {
this.containerMap = new ConcurrentHashMap<>();
this.containerMap = new HashMap<>();
this.userName = userName;
this.volumeName = volumeName;
this.blockSize = blockSize;
Expand Down

0 comments on commit dddded0

Please sign in to comment.