From 67870e486b3faeb19c39c3274c86045a8af4fc63 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Thu, 30 Nov 2023 23:07:32 +0800 Subject: [PATCH] HDFS-17267. Client send the same packet multiple times when method markSlowNode throws IOException. --- .../org/apache/hadoop/hdfs/DataStreamer.java | 19 +++++++++---------- ...TestClientProtocolForPipelineRecovery.java | 2 +- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index d92f5943fd8a2..03705bbe2f9d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1206,16 +1206,6 @@ public void run() { } } - if (slownodesFromAck.isEmpty()) { - if (!slowNodeMap.isEmpty()) { - slowNodeMap.clear(); - } - } else { - markSlowNode(slownodesFromAck); - LOG.debug("SlowNodeMap content: {}.", slowNodeMap); - } - - assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: " + ack; if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack @@ -1259,6 +1249,15 @@ public void run() { dataQueue.notifyAll(); one.releaseBuffer(byteArrayManager); + if (slownodesFromAck.isEmpty()) { + if (!slowNodeMap.isEmpty()) { + slowNodeMap.clear(); + } + } else { + markSlowNode(slownodesFromAck); + // TODO remove out of synchronized. + LOG.debug("SlowNodeMap content: {}.", slowNodeMap); + } } } catch (Throwable e) { if (!responderClosed) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 9962e8c9deefb..f0caa7a9926df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -934,7 +934,7 @@ public void markSlow(String mirrorAddr, int[] replies) { int count = 0; Random r = new Random(); byte[] b = new byte[oneWriteSize]; - while (count < threshold) { + while (count < threshold + 1) { r.nextBytes(b); o.write(b); count++;