Skip to content

Commit

Permalink
HDFS-15618. Improve datanode shutdown latency. Contributed by Ahmed H…
Browse files Browse the repository at this point in the history
…ussein.
  • Loading branch information
kihwal committed Oct 21, 2020
1 parent 7b43596 commit 88a9f42
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 24; // 3 weeks.
public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
/**
* The amount of time in milliseconds that the BlockScanner times out waiting
* for the VolumeScanner thread to join during a shutdown call.
*/
public static final String DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY =
"dfs.block.scanner.volume.join.timeout.ms";
public static final long DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT =
TimeUnit.SECONDS.toMillis(5);
public static final String DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED =
"dfs.block.scanner.skip.recent.accessed";
public static final boolean DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;

Expand Down Expand Up @@ -68,6 +70,12 @@ public class BlockScanner {
*/
private Conf conf;

/**
* Timeout duration in milliseconds waiting for {@link VolumeScanner} to stop
* inside {@link #removeAllVolumeScanners}.
*/
private long joinVolumeScannersTimeOutMs;

@VisibleForTesting
void setConf(Conf conf) {
this.conf = conf;
Expand Down Expand Up @@ -185,6 +193,9 @@ public BlockScanner(DataNode datanode) {

public BlockScanner(DataNode datanode, Configuration conf) {
this.datanode = datanode;
setJoinVolumeScannersTimeOutMs(
conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT));
this.conf = new Conf(conf);
if (isEnabled()) {
LOG.info("Initialized block scanner with targetBytesPerSec {}",
Expand All @@ -204,6 +215,13 @@ public boolean isEnabled() {
return (conf.scanPeriodMs > 0) && (conf.targetBytesPerSec > 0);
}

/**
* Returns true if there is any scanner thread registered.
*/
public synchronized boolean hasAnyRegisteredScanner() {
return !scanners.isEmpty();
}

/**
* Set up a scanner for the given block pool and volume.
*
Expand Down Expand Up @@ -268,15 +286,18 @@ public synchronized void removeVolumeScanner(FsVolumeSpi volume) {
/**
* Stops and removes all volume scanners.
*
* This function will block until all the volume scanners have stopped.
* This function is called on shutdown. It will return even if some of
* the scanners don't terminate in time. Since the scanners are daemon
* threads and do not alter the block content, it is safe to ignore
* such conditions on shutdown.
*/
public synchronized void removeAllVolumeScanners() {
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
entry.getValue().shutdown();
}
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
Uninterruptibles.joinUninterruptibly(entry.getValue(),
5, TimeUnit.MINUTES);
getJoinVolumeScannersTimeOutMs(), TimeUnit.MILLISECONDS);
}
scanners.clear();
}
Expand Down Expand Up @@ -352,6 +373,14 @@ synchronized void markSuspectBlock(String storageId, ExtendedBlock block) {
scanner.markSuspectBlock(block);
}

public long getJoinVolumeScannersTimeOutMs() {
return joinVolumeScannersTimeOutMs;
}

public void setJoinVolumeScannersTimeOutMs(long joinScannersTimeOutMs) {
this.joinVolumeScannersTimeOutMs = joinScannersTimeOutMs;
}

@InterfaceAudience.Private
public static class Servlet extends HttpServlet {
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,9 @@ void shutdownBlockPool(BPOfferService bpos) {
// a block pool id
String bpId = bpos.getBlockPoolId();

blockScanner.disableBlockPoolId(bpId);
if (blockScanner.hasAnyRegisteredScanner()) {
blockScanner.disableBlockPoolId(bpId);
}

if (data != null) {
data.shutdownBlockPool(bpId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,12 +670,14 @@ public void run() {
LOG.error("{} exiting because of exception ", this, e);
}
LOG.info("{} exiting.", this);
VolumeScannerCBInjector.get().preSavingBlockIteratorTask(this);
// Save the current position of all block iterators and close them.
for (BlockIterator iter : blockIters) {
saveBlockIterator(iter);
IOUtils.cleanup(null, iter);
}
} finally {
VolumeScannerCBInjector.get().terminationCallBack(this);
// When the VolumeScanner exits, release the reference we were holding
// on the volume. This will allow the volume to be removed later.
IOUtils.cleanup(null, ref);
Expand All @@ -695,6 +697,7 @@ public synchronized void shutdown() {
stopping = true;
notify();
this.interrupt();
VolumeScannerCBInjector.get().shutdownCallBack(this);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* Used for injecting call backs in {@link VolumeScanner}
* and {@link BlockScanner} tests.
* Calls into this are a no-op in production code.
*/
@VisibleForTesting
@InterfaceAudience.Private
public class VolumeScannerCBInjector {
private static VolumeScannerCBInjector instance =
new VolumeScannerCBInjector();

public static VolumeScannerCBInjector get() {
return instance;
}

public static void set(VolumeScannerCBInjector injector) {
instance = injector;
}

public void preSavingBlockIteratorTask(final VolumeScanner volumeScanner) {
}

public void shutdownCallBack(final VolumeScanner volumeScanner) {
}

public void terminationCallBack(final VolumeScanner volumeScanner) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,15 @@
</description>
</property>

<property>
<name>dfs.block.scanner.volume.join.timeout.ms</name>
<value>5000</value>
<description>
The amount of time in milliseconds that the BlockScanner times out waiting
for the VolumeScanner thread to join during a shutdown call.
</description>
</property>

<property>
<name>dfs.datanode.readahead.bytes</name>
<value>4194304</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
Expand Down Expand Up @@ -80,6 +81,7 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.Multimap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient;
import org.apache.hadoop.hdfs.server.datanode.VolumeScanner;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
Expand Down Expand Up @@ -171,6 +173,13 @@ public class MiniDFSCluster implements AutoCloseable {
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY
= DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY + ".testing";
/**
* For the Junit tests, this is the default value of the The amount of time
* in milliseconds that the BlockScanner times out waiting for the
* {@link VolumeScanner} thread to join during a shutdown call.
*/
public static final long DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC =
TimeUnit.SECONDS.toMillis(30);

// Changing this default may break some tests that assume it is 2.
private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
Expand Down Expand Up @@ -217,8 +226,7 @@ public static class Builder {

public Builder(Configuration conf) {
this.conf = conf;
this.storagesPerDatanode =
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
initDefaultConfigurations();
if (null == conf.get(HDFS_MINIDFS_BASEDIR)) {
conf.set(HDFS_MINIDFS_BASEDIR,
new File(getBaseDirectory()).getAbsolutePath());
Expand All @@ -227,8 +235,7 @@ public Builder(Configuration conf) {

public Builder(Configuration conf, File basedir) {
this.conf = conf;
this.storagesPerDatanode =
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
initDefaultConfigurations();
if (null == basedir) {
throw new IllegalArgumentException(
"MiniDFSCluster base directory cannot be null");
Expand Down Expand Up @@ -492,6 +499,19 @@ public Builder useConfiguredTopologyMappingClass(
public MiniDFSCluster build() throws IOException {
return new MiniDFSCluster(this);
}

/**
* Initializes default values for the cluster.
*/
private void initDefaultConfigurations() {
long defaultScannerVolumeTimeOut =
conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC);
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
defaultScannerVolumeTimeOut);
this.storagesPerDatanode =
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
}
}

/**
Expand Down

0 comments on commit 88a9f42

Please sign in to comment.