Skip to content

Commit

Permalink
HDFS-8808. dfs.image.transfer.bandwidthPerSec should not apply to -bo…
Browse files Browse the repository at this point in the history
…otstrapStandby. Contributed by Zhe Zhang.
  • Loading branch information
zhe-thoughts committed Oct 23, 2015
1 parent d3a34a4 commit ab3c4cf
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 19 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -1563,6 +1563,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes.
(Walter Su via lei)

HDFS-8808. dfs.image.transfer.bandwidthPerSec should not apply to
-bootstrapStandby (zhz)

OPTIMIZATIONS

HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -520,6 +520,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.image.transfer.bandwidthPerSec";
public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0; //no throttling

public static final String DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY =
"dfs.image.transfer-bootstrap-standby.bandwidthPerSec";
public static final long DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_DEFAULT =
0; //no throttling

// Image transfer timeout
public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
Expand Down
Expand Up @@ -222,7 +222,7 @@ void doCheckpoint() throws IOException {
"image with txid " + sig.mostRecentCheckpointTxId);
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId, bnStorage,
true);
true, false);
bnImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId, downloadedHash);
lastApplied = sig.mostRecentCheckpointTxId;
Expand Down
Expand Up @@ -82,6 +82,7 @@ public class ImageServlet extends HttpServlet {
private static final String STORAGEINFO_PARAM = "storageInfo";
private static final String LATEST_FSIMAGE_VALUE = "latest";
private static final String IMAGE_FILE_TYPE = "imageFile";
private static final String IS_BOOTSTRAP_STANDBY = "bootstrapstandby";

private SortedSet<ImageUploadRequest> currentlyDownloadingCheckpoints = Collections
.<ImageUploadRequest> synchronizedSortedSet(new TreeSet<ImageUploadRequest>());
Expand Down Expand Up @@ -157,8 +158,10 @@ private void serveFile(File file) throws IOException {
// detected by the client side as an inaccurate length header.
}
// send file
DataTransferThrottler throttler = parsedParams.isBootstrapStandby ?
getThrottlerForBootstrapStandby(conf) : getThrottler(conf);
TransferFsImage.copyFileToStream(response.getOutputStream(),
file, fis, getThrottler(conf));
file, fis, throttler);
} finally {
IOUtils.closeStream(fis);
}
Expand Down Expand Up @@ -215,8 +218,8 @@ public static void setFileNameHeaders(HttpServletResponse response,
* @param conf configuration
* @return a data transfer throttler
*/
public final static DataTransferThrottler getThrottler(Configuration conf) {
long transferBandwidth =
public static DataTransferThrottler getThrottler(Configuration conf) {
long transferBandwidth =
conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
DataTransferThrottler throttler = null;
Expand All @@ -225,7 +228,20 @@ public final static DataTransferThrottler getThrottler(Configuration conf) {
}
return throttler;
}


private static DataTransferThrottler getThrottlerForBootstrapStandby(
Configuration conf) {
long transferBandwidth =
conf.getLong(
DFSConfigKeys.DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_DEFAULT);
DataTransferThrottler throttler = null;
if (transferBandwidth > 0) {
throttler = new DataTransferThrottler(transferBandwidth);
}
return throttler;
}

@VisibleForTesting
static boolean isValidRequestor(ServletContext context, String remoteUser,
Configuration conf) throws IOException {
Expand Down Expand Up @@ -301,13 +317,14 @@ static String getParamStringForMostRecentImage() {
}

static String getParamStringForImage(NameNodeFile nnf, long txid,
StorageInfo remoteStorageInfo) {
StorageInfo remoteStorageInfo, boolean isBootstrapStandby) {
final String imageType = nnf == null ? "" : "&" + IMAGE_FILE_TYPE + "="
+ nnf.name();
return "getimage=1&" + TXID_PARAM + "=" + txid
+ imageType
+ "&" + STORAGEINFO_PARAM + "=" +
remoteStorageInfo.toColonSeparatedString();
+ "&" + STORAGEINFO_PARAM + "="
+ remoteStorageInfo.toColonSeparatedString() + "&"
+ IS_BOOTSTRAP_STANDBY + "=" + isBootstrapStandby;
}

static String getParamStringForLog(RemoteEditLog log,
Expand All @@ -325,6 +342,7 @@ static class GetImageParams {
private long startTxId, endTxId, txId;
private String storageInfoString;
private boolean fetchLatest;
private boolean isBootstrapStandby;

/**
* @param request the object from which this servlet reads the url contents
Expand All @@ -336,7 +354,7 @@ public GetImageParams(HttpServletRequest request,
) throws IOException {
@SuppressWarnings("unchecked")
Map<String, String[]> pmap = request.getParameterMap();
isGetImage = isGetEdit = fetchLatest = false;
isGetImage = isGetEdit = fetchLatest = isBootstrapStandby = false;

for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
String key = entry.getKey();
Expand All @@ -348,6 +366,10 @@ public GetImageParams(HttpServletRequest request,
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
String bootstrapStandby = ServletUtil.getParameter(request,
IS_BOOTSTRAP_STANDBY);
isBootstrapStandby = bootstrapStandby != null &&
Boolean.parseBoolean(bootstrapStandby);
} catch (NumberFormatException nfe) {
if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) {
fetchLatest = true;
Expand Down
Expand Up @@ -421,7 +421,7 @@ public Boolean run() throws Exception {
LOG.info("Image has changed. Downloading updated image from NN.");
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
nnHostPort, sig.mostRecentCheckpointTxId,
dstImage.getStorage(), true);
dstImage.getStorage(), true, false);
dstImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId, downloadedHash);
}
Expand Down
Expand Up @@ -128,9 +128,10 @@ public static void downloadMostRecentImageToDirectory(URL infoServer,
}

public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
Storage dstStorage, boolean needDigest) throws IOException {
Storage dstStorage, boolean needDigest, boolean isBootstrapStandby)
throws IOException {
String fileid = ImageServlet.getParamStringForImage(null,
imageTxId, dstStorage);
imageTxId, dstStorage, isBootstrapStandby);
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);

List<File> dstFiles = dstStorage.getFiles(
Expand Down
Expand Up @@ -333,7 +333,7 @@ private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameN

// Download that checkpoint into our storage directories.
MD5Hash hash = TransferFsImage.downloadImageToStorage(
proxyInfo.getHttpAddress(), imageTxId, storage, true);
proxyInfo.getHttpAddress(), imageTxId, storage, true, true);
image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId,
hash);
} catch (IOException ioe) {
Expand Down
Expand Up @@ -1073,14 +1073,30 @@
<name>dfs.image.transfer.bandwidthPerSec</name>
<value>0</value>
<description>
Maximum bandwidth used for image transfer in bytes per second.
Maximum bandwidth used for regular image transfers (instead of
bootstrapping the standby namenode), in bytes per second.
This can help keep normal namenode operations responsive during
checkpointing. The maximum bandwidth and timeout in
dfs.image.transfer.timeout should be set such that normal image
transfers can complete successfully.
A default value of 0 indicates that throttling is disabled.
</description>
</property>
A default value of 0 indicates that throttling is disabled.
The maximum bandwidth used for bootstrapping standby namenode is
configured with dfs.image.transfer-bootstrap-standby.bandwidthPerSec.
</description>
</property>

<property>
<name>dfs.image.transfer-bootstrap-standby.bandwidthPerSec</name>
<value>0</value>
<description>
Maximum bandwidth used for transferring image to bootstrap standby
namenode, in bytes per second.
A default value of 0 indicates that throttling is disabled. This default
value should be used in most cases, to ensure timely HA operations.
The maximum bandwidth used for regular image transfers is configured
with dfs.image.transfer.bandwidthPerSec.
</description>
</property>

<property>
<name>dfs.image.transfer.chunksize</name>
Expand Down
Expand Up @@ -2002,7 +2002,7 @@ public void testNamespaceVerifiedOnFileTransfer() throws IOException {
.when(dstImage).toColonSeparatedString();

try {
TransferFsImage.downloadImageToStorage(fsName, 0, dstImage, false);
TransferFsImage.downloadImageToStorage(fsName, 0, dstImage, false, false);
fail("Storage info was not verified");
} catch (IOException ioe) {
String msg = StringUtils.stringifyException(ioe);
Expand Down
Expand Up @@ -24,11 +24,14 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeoutException;

import com.google.common.base.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
Expand Down Expand Up @@ -212,6 +215,64 @@ public void testOtherNodeNotActive() throws Exception {
assertSuccessfulBootstrapFromIndex(1);
}

/**
* Test that bootstrapping standby NN is not limited by
* {@link DFSConfigKeys#DFS_IMAGE_TRANSFER_RATE_KEY}, but is limited by
* {@link DFSConfigKeys#DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY}
* created by HDFS-8808.
*/
@Test
public void testRateThrottling() throws Exception {
cluster.getConfiguration(0).setLong(
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 1);
cluster.restartNameNode(0);
cluster.waitActive();
nn0 = cluster.getNameNode(0);
cluster.transitionToActive(0);
// Each edit has at least 1 byte. So the lowRate definitely should cause
// a timeout, if enforced. If lowRate is not enforced, any reasonable test
// machine should at least download an image with 5 edits in 5 seconds.
for (int i = 0; i < 5; i++) {
nn0.getRpcServer().rollEditLog();
}
// A very low DFS_IMAGE_TRANSFER_RATE_KEY value won't affect bootstrapping
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
try {
testSuccessfulBaseCase();
return true;
} catch (Exception e) {
return false;
}
}
}, 500, 5000);

shutdownCluster();
setupCluster();
cluster.getConfiguration(0).setLong(
DFSConfigKeys.DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY, 1);
cluster.restartNameNode(0);
cluster.waitActive();
nn0 = cluster.getNameNode(0);
cluster.transitionToActive(0);
// A very low DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY value should
// cause timeout
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
try {
testSuccessfulBaseCase();
return true;
} catch (Exception e) {
return false;
}
}
}, 500, 5000);
fail("Did not timeout");
} catch (TimeoutException e) {
LOG.info("Encountered expected timeout.");
}
}
private void removeStandbyNameDirs() {
for (int i = 1; i < maxNNCount; i++) {
for (URI u : cluster.getNameDirs(i)) {
Expand Down Expand Up @@ -249,4 +310,4 @@ private void assertSuccessfulBootstrapFromIndex(int start) throws Exception {
assertEquals(0, forceBootstrap(i));
}
}
}
}

0 comments on commit ab3c4cf

Please sign in to comment.