diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 7312f26d0a140..361eff9f9a063 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -199,4 +199,9 @@ void submitDiskBalancerPlan(String planID, long planVersion, String planFile, * @throws IOException - Throws if there is no such key */ String getDiskBalancerSetting(String key) throws IOException; + + /** + * Trigger an instant run of directory scanner if it's not running currently. + */ + String triggerDirectoryScanner() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 47234e8b65d78..545cb973fc943 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetVolumeReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetVolumeReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerDirectoryScannerRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerDirectoryScannerResponseProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeVolumeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto; @@ -118,6 +120,9 @@ public class ClientDatanodeProtocolTranslatorPB implements GetBalancerBandwidthRequestProto.newBuilder().build(); private final static EvictWritersRequestProto VOID_EVICT_WRITERS = EvictWritersRequestProto.newBuilder().build(); + private static final TriggerDirectoryScannerRequestProto + VOID_TRIGGER_DIRECTORY_SCANNER = + TriggerDirectoryScannerRequestProto.newBuilder().build(); public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname, @@ -335,6 +340,18 @@ public void triggerBlockReport(BlockReportOptions options) } } + @Override + public String triggerDirectoryScanner() throws IOException { + TriggerDirectoryScannerResponseProto responseProto; + try { + responseProto = rpcProxy.triggerDirectoryScanner(NULL_CONTROLLER, + VOID_TRIGGER_DIRECTORY_SCANNER); + return responseProto.getResult(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public long getBalancerBandwidth() throws IOException { GetBalancerBandwidthResponseProto response; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto index 6c8d1c5fafb80..580fb3687b41d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto @@ -146,6 +146,16 @@ message TriggerBlockReportRequestProto { message TriggerBlockReportResponseProto { } +message TriggerDirectoryScannerRequestProto { +} + +/** + * result - execution result. + */ +message TriggerDirectoryScannerResponseProto { + required string result = 1; +} + message GetBalancerBandwidthRequestProto { } @@ -278,6 +288,13 @@ service ClientDatanodeProtocolService { rpc triggerBlockReport(TriggerBlockReportRequestProto) returns(TriggerBlockReportResponseProto); + /** + * Trigger an instant run of directory scanner if it's not running currently. + * Return the result of execute. + */ + rpc triggerDirectoryScanner(TriggerDirectoryScannerRequestProto) + returns(TriggerDirectoryScannerResponseProto); + /** * Returns the balancer bandwidth value of datanode. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 0aabff9909149..90216523f75df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerDirectoryScannerRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerDirectoryScannerResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; @@ -238,6 +240,20 @@ public TriggerBlockReportResponseProto triggerBlockReport( return TRIGGER_BLOCK_REPORT_RESP; } + @Override + public TriggerDirectoryScannerResponseProto triggerDirectoryScanner( + RpcController controller, TriggerDirectoryScannerRequestProto request) + throws ServiceException { + String result; + try { + result = impl.triggerDirectoryScanner(); + } catch (IOException e) { + throw new ServiceException(e); + } + return TriggerDirectoryScannerResponseProto.newBuilder() + .setResult(result).build(); + } + @Override public GetBalancerBandwidthResponseProto getBalancerBandwidth( RpcController controller, GetBalancerBandwidthRequestProto request) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 0ed1304cb8f02..e765d9ae26619 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3395,7 +3395,7 @@ public BlockScanner getBlockScanner() { } @VisibleForTesting - DirectoryScanner getDirectoryScanner() { + public DirectoryScanner getDirectoryScanner() { return directoryScanner; } @@ -3838,6 +3838,12 @@ public void triggerBlockReport(BlockReportOptions options) } } + @Override // ClientDatanodeProtocol + public String triggerDirectoryScanner() throws IOException { + checkSuperuserPrivilege(); + return directoryScanner.triggerDirectoryScanner(); + } + /** * @param addr rpc address of the namenode * @return true if the datanode is connected to a NameNode at the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 7b116d9e566f3..6baa286bef643 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -162,4 +162,6 @@ public void markSlow(String dnAddr, int[] replies) {} * Just delay delete replica a while. */ public void delayDeleteReplica() {} + + public void throwIOExceptionWhenReconcile() throws IOException {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index bf88e6fe88bb0..9cefcfe590e42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -74,6 +74,8 @@ public class DirectoryScanner implements Runnable { private final long scanPeriodMsecs; private final long throttleLimitMsPerSec; private final AtomicBoolean shouldRun = new AtomicBoolean(); + // Method reconcile is executing currently. + private final AtomicBoolean reconcileRunning = new AtomicBoolean(); private boolean retainDiffs = false; @@ -362,6 +364,7 @@ public DirectoryScanner(FsDatasetSpi dataset, Configuration conf) { @VisibleForTesting public void start() { shouldRun.set(true); + reconcileRunning.set(false); long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs); LOG.info( @@ -404,6 +407,11 @@ public void run() { "This cycle terminating immediately because 'shouldRun' has been deactivated"); return; } + if (reconcileRunning.get()) { + // Method reconcile is executing. + LOG.warn("This cycle terminating immediately because reconcile is being executed."); + return; + } try { reconcile(); dataset.setLastDirScannerFinishTime(System.currentTimeMillis()); @@ -412,11 +420,13 @@ public void run() { LOG.error( "Exception during DirectoryScanner execution - will continue next cycle", e); + reconcileRunning.set(false); } catch (Error er) { // Non-recoverable error - re-throw after logging the problem LOG.error( "System Error during DirectoryScanner execution - permanently terminating periodic scanner", er); + reconcileRunning.set(false); throw er; } } @@ -432,6 +442,9 @@ void shutdown() { if (!shouldRun.getAndSet(false)) { LOG.warn("Shutdown has been called, but periodic scanner not started"); } + if (reconcileRunning.getAndSet(false)) { + LOG.warn("Shutdown has been called when directory scanner is running."); + } if (masterThread != null) { masterThread.shutdown(); } @@ -463,8 +476,10 @@ void shutdown() { * Reconcile differences between disk and in-memory blocks */ @VisibleForTesting - public void reconcile() throws IOException { + public synchronized void reconcile() throws IOException { + reconcileRunning.set(true); LOG.debug("reconcile start DirectoryScanning"); + DataNodeFaultInjector.get().throwIOExceptionWhenReconcile(); scan(); // HDFS-14476: run checkAndUpdate with batch to avoid holding the lock too @@ -488,6 +503,7 @@ public void reconcile() throws IOException { if (!retainDiffs) { clear(); } + reconcileRunning.set(false); } /** @@ -777,4 +793,29 @@ private void accumulateTimeWaiting() { perfTimer.reset().start(); } } + + /** + * Trigger an instant run of directory scanner if it's not running currently. + * @return + */ + public String triggerDirectoryScanner() throws IOException { + if (reconcileRunning.get()) { + return "Trigger DirectoryScanner failed, because it's running. Please try again later."; + } + try { + reconcile(); + dataset.setLastDirScannerFinishTime(System.currentTimeMillis()); + } catch (Exception e) { + // Log and reset reconcileRunning. + LOG.error( + "Exception during trigger DirectoryScanner execution, " + + "Try again later.", e); + reconcileRunning.set(false); + throw e; + } catch (Throwable e) { + reconcileRunning.set(false); + throw e; + } + return "Trigger DirectoryScanner successfully."; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index ad4292a6b79fe..54deb2f93c6bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -469,6 +469,7 @@ static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOExcep "\t[-getDatanodeInfo ]\n" + "\t[-metasave filename]\n" + "\t[-triggerBlockReport [-incremental] [-namenode ]]\n" + + "\t[-triggerDirectoryScanner \n" + "\t[-listOpenFiles [-blockingDecommission] [-path ]]\n" + "\t[-help [cmd]]\n"; @@ -1143,7 +1144,7 @@ public int getBalancerBandwidth(String[] argv, int idx) throws IOException { /** * Download the most recent fsimage from the name node, and save it to a local * file in the given directory. - * + * * @param argv * List of of command line parameters. * @param idx @@ -1350,6 +1351,10 @@ private void printHelp(String cmd) { + "\tIf 'blockingDecommission' option is specified, it will list the\n" + "\topen files only that are blocking the ongoing Decommission."; + String triggerDirectoryScanner = "-triggerDirectoryScanner :\n" + + "\tTrigger an instant run of directory scanner for the datanode" + + "\tif it's not running currently."; + String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + "\t\tis specified.\n"; @@ -1423,6 +1428,8 @@ private void printHelp(String cmd) { System.out.println(triggerBlockReport); } else if ("listOpenFiles".equalsIgnoreCase(cmd)) { System.out.println(listOpenFiles); + } else if ("triggerDirectoryScanner".equalsIgnoreCase(cmd)) { + System.out.println(triggerDirectoryScanner); } else if ("help".equals(cmd)) { System.out.println(help); } else { @@ -1461,6 +1468,7 @@ private void printHelp(String cmd) { System.out.println(getDatanodeInfo); System.out.println(triggerBlockReport); System.out.println(listOpenFiles); + System.out.println(triggerDirectoryScanner); System.out.println(help); System.out.println(); ToolRunner.printGenericCommandUsage(System.out); @@ -2354,6 +2362,9 @@ private static void printUsage(String cmd) { } else if ("-listOpenFiles".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-listOpenFiles [-blockingDecommission] [-path ]]"); + } else if ("-triggerDirectoryScanner".equals(cmd)) { + System.err.println("Usage: hdfs dfsadmin" + + " [-triggerDirectoryScanner ]"); } else { System.err.println("Usage: hdfs dfsadmin"); System.err.println("Note: Administrative commands can only be run as the HDFS superuser."); @@ -2521,6 +2532,11 @@ public int run(String[] argv) { printUsage(cmd); return exitCode; } + } else if ("-triggerDirectoryScanner".equals(cmd)) { + if (argv.length != 2) { + printUsage(cmd); + return exitCode; + } } // initialize DFSAdmin @@ -2599,6 +2615,8 @@ public int run(String[] argv) { exitCode = triggerBlockReport(argv); } else if ("-listOpenFiles".equals(cmd)) { exitCode = listOpenFiles(argv); + } else if ("-triggerDirectoryScanner".equals(cmd)){ + exitCode = triggerDirectoryScanner(argv, i); } else if ("-help".equals(cmd)) { if (i < argv.length) { printHelp(argv[i]); @@ -2751,6 +2769,17 @@ private int getDatanodeInfo(String[] argv, int i) throws IOException { return 0; } + public int triggerDirectoryScanner(String[] argv, int idx) throws IOException { + ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[idx]); + try { + String result = dnProxy.triggerDirectoryScanner(); + System.out.println(result); + } catch (IOException ioe) { + throw new IOException("Exception during trigger DirectoryScanner execution. " + ioe, ioe); + } + return 0; + } + /** * main() has some simple utility methods. * @param argv Command line parameters. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 78664e27ca286..a1ec071f707c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.IOUtils; @@ -236,6 +237,60 @@ private static void scanIntoList( scanner.close(); } + @Test(timeout = 30000) + public void testTriggerDirectoryScanner() throws Exception { + redirectStream(); + final DFSAdmin dfsAdmin = new DFSAdmin(conf); + for (int i = 0; i < cluster.getDataNodes().size(); i++) { + resetStream(); + final DataNode dn = cluster.getDataNodes().get(i); + final String addr = String.format("%s:%d", dn.getXferAddress() + .getHostString(), dn.getIpcPort()); + final int ret = ToolRunner.run(dfsAdmin, new String[] { + "-triggerDirectoryScanner", addr}); + assertEquals(0, ret); + /* collect outputs */ + final List outs = Lists.newArrayList(); + scanIntoList(out, outs); + assertThat(outs.get(0), + is(allOf(containsString("Trigger DirectoryScanner successfully.")))); + } + } + + @Test(timeout = 10000) + public void testDirectoryScannerAfterTriggerFailed() throws Exception { + redirectStream(); + conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 3L); + restartCluster(); + final DFSAdmin dfsAdmin = new DFSAdmin(conf); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() { + @Override + public void throwIOExceptionWhenReconcile() throws IOException { + throw new IOException("mock IOException."); + } + }; + + try { + for (int i = 0; i < cluster.getDataNodes().size(); i++) { + resetStream(); + DataNodeFaultInjector.set(dnFaultInjector); + final DataNode dn = cluster.getDataNodes().get(i); + final String addr = String.format("%s:%d", dn.getXferAddress() + .getHostString(), dn.getIpcPort()); + final int ret = ToolRunner.run(dfsAdmin, new String[] { + "-triggerDirectoryScanner", addr}); + assertEquals(-1, ret); + DataNodeFaultInjector.set(oldInjector); + final int ret2 = ToolRunner.run(dfsAdmin, new String[] { + "-triggerDirectoryScanner", addr}); + assertEquals(0, ret2); + } + } finally { + DataNodeFaultInjector.set(oldInjector); + } + } + @Test(timeout = 30000) public void testGetDatanodeInfo() throws Exception { redirectStream();