Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,9 @@ void submitDiskBalancerPlan(String planID, long planVersion, String planFile,
* @throws IOException - Throws if there is no such key
*/
String getDiskBalancerSetting(String key) throws IOException;

/**
* Trigger an instant run of directory scanner if it's not running currently.
*/
String triggerDirectoryScanner() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetVolumeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetVolumeReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerDirectoryScannerRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerDirectoryScannerResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeVolumeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
Expand Down Expand Up @@ -118,6 +120,9 @@ public class ClientDatanodeProtocolTranslatorPB implements
GetBalancerBandwidthRequestProto.newBuilder().build();
private final static EvictWritersRequestProto VOID_EVICT_WRITERS =
EvictWritersRequestProto.newBuilder().build();
private static final TriggerDirectoryScannerRequestProto
VOID_TRIGGER_DIRECTORY_SCANNER =
TriggerDirectoryScannerRequestProto.newBuilder().build();

public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
Expand Down Expand Up @@ -335,6 +340,18 @@ public void triggerBlockReport(BlockReportOptions options)
}
}

@Override
public String triggerDirectoryScanner() throws IOException {
TriggerDirectoryScannerResponseProto responseProto;
try {
responseProto = rpcProxy.triggerDirectoryScanner(NULL_CONTROLLER,
VOID_TRIGGER_DIRECTORY_SCANNER);
return responseProto.getResult();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}

@Override
public long getBalancerBandwidth() throws IOException {
GetBalancerBandwidthResponseProto response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ message TriggerBlockReportRequestProto {
message TriggerBlockReportResponseProto {
}

message TriggerDirectoryScannerRequestProto {
}

/**
* result - execution result.
*/
message TriggerDirectoryScannerResponseProto {
required string result = 1;
}

message GetBalancerBandwidthRequestProto {
}

Expand Down Expand Up @@ -278,6 +288,13 @@ service ClientDatanodeProtocolService {
rpc triggerBlockReport(TriggerBlockReportRequestProto)
returns(TriggerBlockReportResponseProto);

/**
* Trigger an instant run of directory scanner if it's not running currently.
* Return the result of execute.
*/
rpc triggerDirectoryScanner(TriggerDirectoryScannerRequestProto)
returns(TriggerDirectoryScannerResponseProto);

/**
* Returns the balancer bandwidth value of datanode.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerDirectoryScannerRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerDirectoryScannerResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
Expand Down Expand Up @@ -238,6 +240,20 @@ public TriggerBlockReportResponseProto triggerBlockReport(
return TRIGGER_BLOCK_REPORT_RESP;
}

@Override
public TriggerDirectoryScannerResponseProto triggerDirectoryScanner(
RpcController controller, TriggerDirectoryScannerRequestProto request)
throws ServiceException {
String result;
try {
result = impl.triggerDirectoryScanner();
} catch (IOException e) {
throw new ServiceException(e);
}
return TriggerDirectoryScannerResponseProto.newBuilder()
.setResult(result).build();
}

@Override
public GetBalancerBandwidthResponseProto getBalancerBandwidth(
RpcController controller, GetBalancerBandwidthRequestProto request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3395,7 +3395,7 @@ public BlockScanner getBlockScanner() {
}

@VisibleForTesting
DirectoryScanner getDirectoryScanner() {
public DirectoryScanner getDirectoryScanner() {
return directoryScanner;
}

Expand Down Expand Up @@ -3838,6 +3838,12 @@ public void triggerBlockReport(BlockReportOptions options)
}
}

@Override // ClientDatanodeProtocol
public String triggerDirectoryScanner() throws IOException {
checkSuperuserPrivilege();
return directoryScanner.triggerDirectoryScanner();
}

/**
* @param addr rpc address of the namenode
* @return true if the datanode is connected to a NameNode at the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,6 @@ public void markSlow(String dnAddr, int[] replies) {}
* Just delay delete replica a while.
*/
public void delayDeleteReplica() {}

public void throwIOExceptionWhenReconcile() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class DirectoryScanner implements Runnable {
private final long scanPeriodMsecs;
private final long throttleLimitMsPerSec;
private final AtomicBoolean shouldRun = new AtomicBoolean();
// Method reconcile is executing currently.
private final AtomicBoolean reconcileRunning = new AtomicBoolean();

private boolean retainDiffs = false;

Expand Down Expand Up @@ -362,6 +364,7 @@ public DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) {
@VisibleForTesting
public void start() {
shouldRun.set(true);
reconcileRunning.set(false);
long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs);

LOG.info(
Expand Down Expand Up @@ -404,6 +407,11 @@ public void run() {
"This cycle terminating immediately because 'shouldRun' has been deactivated");
return;
}
if (reconcileRunning.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the reconcileRunning variable in this way cannot prevent two threads from executing reconcile() at the same time. When this thread is running after line410 and before line416, another thread may set reconcileRunning to true.

// Method reconcile is executing.
LOG.warn("This cycle terminating immediately because reconcile is being executed.");
return;
}
try {
reconcile();
dataset.setLastDirScannerFinishTime(System.currentTimeMillis());
Expand All @@ -412,11 +420,13 @@ public void run() {
LOG.error(
"Exception during DirectoryScanner execution - will continue next cycle",
e);
reconcileRunning.set(false);
} catch (Error er) {
// Non-recoverable error - re-throw after logging the problem
LOG.error(
"System Error during DirectoryScanner execution - permanently terminating periodic scanner",
er);
reconcileRunning.set(false);
throw er;
}
}
Expand All @@ -432,6 +442,9 @@ void shutdown() {
if (!shouldRun.getAndSet(false)) {
LOG.warn("Shutdown has been called, but periodic scanner not started");
}
if (reconcileRunning.getAndSet(false)) {
LOG.warn("Shutdown has been called when directory scanner is running.");
}
if (masterThread != null) {
masterThread.shutdown();
}
Expand Down Expand Up @@ -463,8 +476,10 @@ void shutdown() {
* Reconcile differences between disk and in-memory blocks
*/
@VisibleForTesting
public void reconcile() throws IOException {
public synchronized void reconcile() throws IOException {
reconcileRunning.set(true);
LOG.debug("reconcile start DirectoryScanning");
DataNodeFaultInjector.get().throwIOExceptionWhenReconcile();
scan();

// HDFS-14476: run checkAndUpdate with batch to avoid holding the lock too
Expand All @@ -488,6 +503,7 @@ public void reconcile() throws IOException {
if (!retainDiffs) {
clear();
}
reconcileRunning.set(false);
}

/**
Expand Down Expand Up @@ -777,4 +793,29 @@ private void accumulateTimeWaiting() {
perfTimer.reset().start();
}
}

/**
* Trigger an instant run of directory scanner if it's not running currently.
* @return
*/
public String triggerDirectoryScanner() throws IOException {
if (reconcileRunning.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as mentioned above.

return "Trigger DirectoryScanner failed, because it's running. Please try again later.";
}
try {
reconcile();
dataset.setLastDirScannerFinishTime(System.currentTimeMillis());
} catch (Exception e) {
// Log and reset reconcileRunning.
LOG.error(
"Exception during trigger DirectoryScanner execution, " +
"Try again later.", e);
reconcileRunning.set(false);
throw e;
} catch (Throwable e) {
reconcileRunning.set(false);
throw e;
}
Copy link
Contributor

@tomscut tomscut Jul 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safer to capture the Throwable.

try {
      reconcile();
} catch (Exception e) {
      ...
} catch (Throwable e) {
     reconcileRunning.set(false);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestions. Have fixed it. Thanks~

return "Trigger DirectoryScanner successfully.";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOExcep
"\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
"\t[-metasave filename]\n" +
"\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port> [-namenode <namenode_host:ipc_port>]]\n" +
"\t[-triggerDirectoryScanner <datanode_host:ipc_port>\n" +
"\t[-listOpenFiles [-blockingDecommission] [-path <path>]]\n" +
"\t[-help [cmd]]\n";

Expand Down Expand Up @@ -1143,7 +1144,7 @@ public int getBalancerBandwidth(String[] argv, int idx) throws IOException {
/**
* Download the most recent fsimage from the name node, and save it to a local
* file in the given directory.
*
*
* @param argv
* List of of command line parameters.
* @param idx
Expand Down Expand Up @@ -1350,6 +1351,10 @@ private void printHelp(String cmd) {
+ "\tIf 'blockingDecommission' option is specified, it will list the\n"
+ "\topen files only that are blocking the ongoing Decommission.";

String triggerDirectoryScanner = "-triggerDirectoryScanner <datanode_host:ipc_port>:\n" +
"\tTrigger an instant run of directory scanner for the datanode" +
"\tif it's not running currently.";

String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";

Expand Down Expand Up @@ -1423,6 +1428,8 @@ private void printHelp(String cmd) {
System.out.println(triggerBlockReport);
} else if ("listOpenFiles".equalsIgnoreCase(cmd)) {
System.out.println(listOpenFiles);
} else if ("triggerDirectoryScanner".equalsIgnoreCase(cmd)) {
System.out.println(triggerDirectoryScanner);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
Expand Down Expand Up @@ -1461,6 +1468,7 @@ private void printHelp(String cmd) {
System.out.println(getDatanodeInfo);
System.out.println(triggerBlockReport);
System.out.println(listOpenFiles);
System.out.println(triggerDirectoryScanner);
System.out.println(help);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
Expand Down Expand Up @@ -2354,6 +2362,9 @@ private static void printUsage(String cmd) {
} else if ("-listOpenFiles".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-listOpenFiles [-blockingDecommission] [-path <path>]]");
} else if ("-triggerDirectoryScanner".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-triggerDirectoryScanner <datanode_host:ipc_port>]");
} else {
System.err.println("Usage: hdfs dfsadmin");
System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
Expand Down Expand Up @@ -2521,6 +2532,11 @@ public int run(String[] argv) {
printUsage(cmd);
return exitCode;
}
} else if ("-triggerDirectoryScanner".equals(cmd)) {
if (argv.length != 2) {
printUsage(cmd);
return exitCode;
}
}

// initialize DFSAdmin
Expand Down Expand Up @@ -2599,6 +2615,8 @@ public int run(String[] argv) {
exitCode = triggerBlockReport(argv);
} else if ("-listOpenFiles".equals(cmd)) {
exitCode = listOpenFiles(argv);
} else if ("-triggerDirectoryScanner".equals(cmd)){
exitCode = triggerDirectoryScanner(argv, i);
} else if ("-help".equals(cmd)) {
if (i < argv.length) {
printHelp(argv[i]);
Expand Down Expand Up @@ -2751,6 +2769,17 @@ private int getDatanodeInfo(String[] argv, int i) throws IOException {
return 0;
}

public int triggerDirectoryScanner(String[] argv, int idx) throws IOException {
ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[idx]);
try {
String result = dnProxy.triggerDirectoryScanner();
System.out.println(result);
} catch (IOException ioe) {
throw new IOException("Exception during trigger DirectoryScanner execution. " + ioe, ioe);
}
return 0;
}

/**
* main() has some simple utility methods.
* @param argv Command line parameters.
Expand Down
Loading