diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 13ff9549e0200c..dfdfbb1a3125db 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -36,7 +36,7 @@ import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException; +import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; @@ -738,7 +739,19 @@ private void offerService() throws Exception { if (state == HAServiceState.ACTIVE) { handleRollingUpgradeStatus(resp); } - commandProcessingThread.enqueue(resp.getCommands()); + // Note: effect only when the KeyUpdateCommand was the last + // or penultimate command in DatanodeCommand[]. + DatanodeCommand[] cmds = resp.getCommands(); + boolean isContaisHighPriorityCmd = false; + if (cmds != null) { + int length = cmds.length; + for (int iter = length - 1; iter >= 0 && iter >= length - 2; iter--) { + isContaisHighPriorityCmd = isContaisHighPriorityCmd || + cmds[iter] instanceof KeyUpdateCommand; + } + } + commandProcessingThread.enqueue(cmds, + isContaisHighPriorityCmd); isSlownode = resp.getIsSlownode(); } } @@ -1389,7 +1402,7 @@ class CommandProcessingThread extends Thread { CommandProcessingThread(BPServiceActor actor) { super("Command processor"); this.actor = actor; - this.queue = new LinkedBlockingQueue<>(); + this.queue = new LinkedBlockingDeque<>(); setDaemon(true); } @@ -1468,6 +1481,11 @@ private boolean processCommand(DatanodeCommand[] cmds) { return true; } + /** + * Only used for cacheReport. + * @param cmd + * @throws InterruptedException + */ void enqueue(DatanodeCommand cmd) throws InterruptedException { if (cmd == null) { return; @@ -1476,6 +1494,11 @@ void enqueue(DatanodeCommand cmd) throws InterruptedException { dn.getMetrics().incrActorCmdQueueLength(1); } + /** + * Used for blockReport. + * @param cmds + * @throws InterruptedException + */ void enqueue(List cmds) throws InterruptedException { if (cmds == null) { return; @@ -1485,8 +1508,18 @@ void enqueue(List cmds) throws InterruptedException { dn.getMetrics().incrActorCmdQueueLength(1); } - void enqueue(DatanodeCommand[] cmds) throws InterruptedException { - queue.put(() -> processCommand(cmds)); + /** + * Used for heartbeating. + * @param cmds + * @throws InterruptedException + */ + void enqueue(DatanodeCommand[] cmds, + boolean containsHighPriorityCmds) throws InterruptedException { + if (containsHighPriorityCmds) { + ((LinkedBlockingDeque) queue).putFirst(() -> processCommand(cmds)); + } else { + queue.put(() -> processCommand(cmds)); + } dn.getMetrics().incrActorCmdQueueLength(1); } }