Skip to content

Commit

Permalink
HDFS-17372. CommandProcessingThread#queue should use LinkedBlockingDe…
Browse files Browse the repository at this point in the history
…que to prevent high priority command blocked by low priority command.
  • Loading branch information
hfutatzhanghb committed Feb 18, 2024
1 parent 4f0f5a5 commit 9d1d14b
Showing 1 changed file with 38 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -62,6 +62,7 @@
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
Expand Down Expand Up @@ -738,7 +739,19 @@ private void offerService() throws Exception {
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}
commandProcessingThread.enqueue(resp.getCommands());
// Note: effect only when the KeyUpdateCommand was the last
// or penultimate command in DatanodeCommand[].
DatanodeCommand[] cmds = resp.getCommands();
boolean isContaisHighPriorityCmd = false;
if (cmds != null) {
int length = cmds.length;
for (int iter = length - 1; iter >= 0 && iter >= length - 2; iter--) {
isContaisHighPriorityCmd = isContaisHighPriorityCmd ||
cmds[iter] instanceof KeyUpdateCommand;
}
}
commandProcessingThread.enqueue(cmds,
isContaisHighPriorityCmd);
isSlownode = resp.getIsSlownode();
}
}
Expand Down Expand Up @@ -1389,7 +1402,7 @@ class CommandProcessingThread extends Thread {
CommandProcessingThread(BPServiceActor actor) {
super("Command processor");
this.actor = actor;
this.queue = new LinkedBlockingQueue<>();
this.queue = new LinkedBlockingDeque<>();
setDaemon(true);
}

Expand Down Expand Up @@ -1468,6 +1481,11 @@ private boolean processCommand(DatanodeCommand[] cmds) {
return true;
}

/**
* Only used for cacheReport.
* @param cmd
* @throws InterruptedException
*/
void enqueue(DatanodeCommand cmd) throws InterruptedException {
if (cmd == null) {
return;
Expand All @@ -1476,6 +1494,11 @@ void enqueue(DatanodeCommand cmd) throws InterruptedException {
dn.getMetrics().incrActorCmdQueueLength(1);
}

/**
* Used for blockReport.
* @param cmds
* @throws InterruptedException
*/
void enqueue(List<DatanodeCommand> cmds) throws InterruptedException {
if (cmds == null) {
return;
Expand All @@ -1485,8 +1508,18 @@ void enqueue(List<DatanodeCommand> cmds) throws InterruptedException {
dn.getMetrics().incrActorCmdQueueLength(1);
}

void enqueue(DatanodeCommand[] cmds) throws InterruptedException {
queue.put(() -> processCommand(cmds));
/**
* Used for heartbeating.
* @param cmds
* @throws InterruptedException
*/
void enqueue(DatanodeCommand[] cmds,
boolean containsHighPriorityCmds) throws InterruptedException {
if (containsHighPriorityCmds) {
((LinkedBlockingDeque<Runnable>) queue).putFirst(() -> processCommand(cmds));
} else {
queue.put(() -> processCommand(cmds));
}
dn.getMetrics().incrActorCmdQueueLength(1);
}
}
Expand Down

0 comments on commit 9d1d14b

Please sign in to comment.