From 96d07aaadc3296c4056ffca5693751e4ed3bf115 Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 7 Jun 2017 17:37:10 +0800 Subject: [PATCH 01/10] [SPARK-20994] Remove reduant characters in OpenBlocks to save memory for shuffle service. --- .../shuffle/ExternalShuffleBlockHandler.java | 66 +++++++++++++------ 1 file changed, 45 insertions(+), 21 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index c0f1da50f5e6..b09062b1b2b9 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -44,7 +44,6 @@ import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; import org.apache.spark.network.util.TransportConf; - /** * RPC Handler for a server which can serve shuffle blocks from outside of an Executor process. * @@ -91,26 +90,8 @@ protected void handleMessage( try { OpenBlocks msg = (OpenBlocks) msgObj; checkAuth(client, msg.appId); - - Iterator iter = new Iterator() { - private int index = 0; - - @Override - public boolean hasNext() { - return index < msg.blockIds.length; - } - - @Override - public ManagedBuffer next() { - final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, - msg.blockIds[index]); - index++; - metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); - return block; - } - }; - - long streamId = streamManager.registerStream(client.getClientId(), iter); + long streamId = streamManager.registerStream(client.getClientId(), + new ManagedBufferIterator(msg.appId, msg.execId, msg.blockIds)); if (logger.isTraceEnabled()) { logger.trace("Registered streamId {} with {} buffers for client {} from host {}", streamId, @@ -209,4 +190,47 @@ public Map getMetrics() { } } + private class ManagedBufferIterator implements Iterator { + + private int index = 0; + private String appId; + private String execId; + private String shuffleId; + // mapId and reduceId are stored as bytes to save memory. + private byte[][] mapIdAndReduceIds; + + ManagedBufferIterator(String appId, String execId, String[] blockIds) { + this.appId = appId; + this.execId = execId; + String blockId = blockIds[0]; + String[] blockIdParts = blockId.split("_"); + if (blockIdParts.length < 4) { + throw new IllegalArgumentException("Unexpected block id format: " + blockId); + } else if (!blockIdParts[0].equals("shuffle")) { + throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId); + } + this.shuffleId = blockIdParts[1]; + mapIdAndReduceIds = new byte[blockIds.length][]; + if (blockIds.length > 0) { + for (int i = 0; i< blockIds.length; i++) { + mapIdAndReduceIds[i] = (blockIdParts[2] + "_" + blockIdParts[3]).getBytes(); + } + } + } + + @Override + public boolean hasNext() { + return index < mapIdAndReduceIds.length; + } + + @Override + public ManagedBuffer next() { + String blockId = "shuffle_" + shuffleId + "_" + new String(mapIdAndReduceIds[index]); + final ManagedBuffer block = blockManager.getBlockData(appId, execId, blockId); + index++; + metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); + return block; + } + } + } From dcf156ae1a5f44e224d0a52051191ce5ede7b94f Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 8 Jun 2017 11:27:03 +0800 Subject: [PATCH 02/10] Fix bug and make mapIdAndReduceIds to be an int array containing mapId and reduceId pairs. --- .../shuffle/ExternalShuffleBlockHandler.java | 29 +++++++++++-------- .../network/sasl/SaslIntegrationSuite.java | 2 +- .../ExternalShuffleBlockHandlerSuite.java | 11 +++---- .../ExternalShuffleIntegrationSuite.java | 8 ++--- 4 files changed, 28 insertions(+), 22 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index b09062b1b2b9..4e860ced790a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -196,24 +196,29 @@ private class ManagedBufferIterator implements Iterator { private String appId; private String execId; private String shuffleId; - // mapId and reduceId are stored as bytes to save memory. - private byte[][] mapIdAndReduceIds; + // An array containing mapId and reduceId pairs. + private int[][] mapIdAndReduceIds; ManagedBufferIterator(String appId, String execId, String[] blockIds) { this.appId = appId; this.execId = execId; - String blockId = blockIds[0]; - String[] blockIdParts = blockId.split("_"); - if (blockIdParts.length < 4) { - throw new IllegalArgumentException("Unexpected block id format: " + blockId); - } else if (!blockIdParts[0].equals("shuffle")) { - throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId); + String[] blockId0Parts = blockIds[0].split("_"); + if (blockId0Parts.length < 4) { + throw new IllegalArgumentException("Unexpected block id format: " + blockIds[0]); + } else if (!blockId0Parts[0].equals("shuffle")) { + throw new IllegalArgumentException("Expected shuffle block id, got: " + blockIds[0]); } - this.shuffleId = blockIdParts[1]; - mapIdAndReduceIds = new byte[blockIds.length][]; + this.shuffleId = blockId0Parts[1]; + mapIdAndReduceIds = new int[blockIds.length][2]; if (blockIds.length > 0) { for (int i = 0; i< blockIds.length; i++) { - mapIdAndReduceIds[i] = (blockIdParts[2] + "_" + blockIdParts[3]).getBytes(); + String[] blockIdParts = blockIds[i].split("_"); + if (!blockIdParts[1].equals(shuffleId)) { + throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + + ", got:" + blockIds[i]); + } + mapIdAndReduceIds[i][0] = Integer.parseInt(blockIdParts[2]); + mapIdAndReduceIds[i][1] = Integer.parseInt(blockIdParts[3]); } } } @@ -225,7 +230,7 @@ public boolean hasNext() { @Override public ManagedBuffer next() { - String blockId = "shuffle_" + shuffleId + "_" + new String(mapIdAndReduceIds[index]); + String blockId = "shuffle_" + shuffleId + "_" + mapIdAndReduceIds[index][0] + "_" + mapIdAndReduceIds[index][1]; final ManagedBuffer block = blockManager.getBlockData(appId, execId, blockId); index++; metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 0c054fc5db8f..8110f1e004c7 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -202,7 +202,7 @@ public void onBlockFetchFailure(String blockId, Throwable t) { } }; - String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" }; + String[] blockIds = { "shuffle_0_1_2", "shuffle_0_3_4" }; OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener, conf, null); fetcher.start(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java index 4d48b1897038..d258dc95c121 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java @@ -83,9 +83,10 @@ public void testOpenShuffleBlocks() { ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3])); ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); - when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker); - when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker); - ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" }) + when(blockResolver.getBlockData("app0", "exec1", "shuffle_0_0_0")).thenReturn(block0Marker); + when(blockResolver.getBlockData("app0", "exec1", "shuffle_0_0_1")).thenReturn(block1Marker); + ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", + new String[] { "shuffle_0_0_0", "shuffle_0_0_1" }) .toByteBuffer(); handler.receive(client, openBlocks, callback); @@ -105,8 +106,8 @@ public void testOpenShuffleBlocks() { assertEquals(block0Marker, buffers.next()); assertEquals(block1Marker, buffers.next()); assertFalse(buffers.hasNext()); - verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0"); - verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1"); + verify(blockResolver, times(1)).getBlockData("app0", "exec1", "shuffle_0_0_0"); + verify(blockResolver, times(1)).getBlockData("app0", "exec1", "shuffle_0_0_1"); // Verify open block request latency metrics Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler) diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index d1d8f5b4e188..4391e3023491 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -214,10 +214,10 @@ public void testFetchNonexistent() throws Exception { @Test public void testFetchWrongExecutor() throws Exception { registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER)); - FetchResult execFetch = fetchBlocks("exec-0", - new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ }); - assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch.successBlocks); - assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks); + FetchResult execFetch0 = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" /* right */}); + FetchResult execFetch1 = fetchBlocks("exec-0", new String[] { "shuffle_1_0_0" /* wrong */ }); + assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch0.successBlocks); + assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch1.failedBlocks); } @Test From 1e53262f7ff53ef4724ceabc146852041d041b62 Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 8 Jun 2017 18:01:32 +0800 Subject: [PATCH 03/10] refine according to srowen's comments --- .../shuffle/ExternalShuffleBlockHandler.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 4e860ced790a..de465b22f361 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -210,16 +210,14 @@ private class ManagedBufferIterator implements Iterator { } this.shuffleId = blockId0Parts[1]; mapIdAndReduceIds = new int[blockIds.length][2]; - if (blockIds.length > 0) { - for (int i = 0; i< blockIds.length; i++) { - String[] blockIdParts = blockIds[i].split("_"); - if (!blockIdParts[1].equals(shuffleId)) { - throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + - ", got:" + blockIds[i]); - } - mapIdAndReduceIds[i][0] = Integer.parseInt(blockIdParts[2]); - mapIdAndReduceIds[i][1] = Integer.parseInt(blockIdParts[3]); + for (int i = 0; i< blockIds.length; i++) { + String[] blockIdParts = blockIds[i].split("_"); + if (!blockIdParts[1].equals(shuffleId)) { + throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + + ", got:" + blockIds[i]); } + mapIdAndReduceIds[i][0] = Integer.parseInt(blockIdParts[2]); + mapIdAndReduceIds[i][1] = Integer.parseInt(blockIdParts[3]); } } @@ -230,7 +228,8 @@ public boolean hasNext() { @Override public ManagedBuffer next() { - String blockId = "shuffle_" + shuffleId + "_" + mapIdAndReduceIds[index][0] + "_" + mapIdAndReduceIds[index][1]; + String blockId = "shuffle_" + shuffleId + "_" + mapIdAndReduceIds[index][0] + "_" + + mapIdAndReduceIds[index][1]; final ManagedBuffer block = blockManager.getBlockData(appId, execId, blockId); index++; metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); From 8170c8a373ace0717ef1859cf14b79929feaabfa Mon Sep 17 00:00:00 2001 From: jinxing Date: Fri, 9 Jun 2017 08:12:59 +0800 Subject: [PATCH 04/10] make mapIdAndReduceIds a single array. --- .../shuffle/ExternalShuffleBlockHandler.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index de465b22f361..6f315ebadcad 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -197,7 +197,7 @@ private class ManagedBufferIterator implements Iterator { private String execId; private String shuffleId; // An array containing mapId and reduceId pairs. - private int[][] mapIdAndReduceIds; + private int[] mapIdAndReduceIds; ManagedBufferIterator(String appId, String execId, String[] blockIds) { this.appId = appId; @@ -209,27 +209,27 @@ private class ManagedBufferIterator implements Iterator { throw new IllegalArgumentException("Expected shuffle block id, got: " + blockIds[0]); } this.shuffleId = blockId0Parts[1]; - mapIdAndReduceIds = new int[blockIds.length][2]; + mapIdAndReduceIds = new int[2 * blockIds.length]; for (int i = 0; i< blockIds.length; i++) { String[] blockIdParts = blockIds[i].split("_"); if (!blockIdParts[1].equals(shuffleId)) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + ", got:" + blockIds[i]); } - mapIdAndReduceIds[i][0] = Integer.parseInt(blockIdParts[2]); - mapIdAndReduceIds[i][1] = Integer.parseInt(blockIdParts[3]); + mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]); + mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]); } } @Override public boolean hasNext() { - return index < mapIdAndReduceIds.length; + return index < mapIdAndReduceIds.length / 2; } @Override public ManagedBuffer next() { - String blockId = "shuffle_" + shuffleId + "_" + mapIdAndReduceIds[index][0] + "_" + - mapIdAndReduceIds[index][1]; + String blockId = "shuffle_" + shuffleId + "_" + mapIdAndReduceIds[2 * index] + "_" + + mapIdAndReduceIds[2 * index + 1]; final ManagedBuffer block = blockManager.getBlockData(appId, execId, blockId); index++; metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); From 5dd0e77775c302e915b601b8c4b17806219ebb3b Mon Sep 17 00:00:00 2001 From: jinxing Date: Sat, 10 Jun 2017 07:44:41 +0800 Subject: [PATCH 05/10] resolve vanzin's comments. --- .../network/shuffle/ExternalShuffleBlockHandler.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 6f315ebadcad..805c79506338 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -193,11 +193,11 @@ public Map getMetrics() { private class ManagedBufferIterator implements Iterator { private int index = 0; - private String appId; - private String execId; - private String shuffleId; + private final String appId; + private final String execId; + private final String shuffleId; // An array containing mapId and reduceId pairs. - private int[] mapIdAndReduceIds; + private final int[] mapIdAndReduceIds; ManagedBufferIterator(String appId, String execId, String[] blockIds) { this.appId = appId; @@ -205,7 +205,8 @@ private class ManagedBufferIterator implements Iterator { String[] blockId0Parts = blockIds[0].split("_"); if (blockId0Parts.length < 4) { throw new IllegalArgumentException("Unexpected block id format: " + blockIds[0]); - } else if (!blockId0Parts[0].equals("shuffle")) { + } + if (!blockId0Parts[0].equals("shuffle")) { throw new IllegalArgumentException("Expected shuffle block id, got: " + blockIds[0]); } this.shuffleId = blockId0Parts[1]; From 1e72eab5a7bc3ee5a48f8380dc4398874f1c36ac Mon Sep 17 00:00:00 2001 From: jinxing Date: Sat, 10 Jun 2017 07:55:46 +0800 Subject: [PATCH 06/10] add a new method to obtain a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). --- .../network/shuffle/ExternalShuffleBlockHandler.java | 9 ++++----- .../network/shuffle/ExternalShuffleBlockResolver.java | 11 +++++++++-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 805c79506338..58e90dd18f33 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -195,7 +195,7 @@ private class ManagedBufferIterator implements Iterator { private int index = 0; private final String appId; private final String execId; - private final String shuffleId; + private final int shuffleId; // An array containing mapId and reduceId pairs. private final int[] mapIdAndReduceIds; @@ -209,7 +209,7 @@ private class ManagedBufferIterator implements Iterator { if (!blockId0Parts[0].equals("shuffle")) { throw new IllegalArgumentException("Expected shuffle block id, got: " + blockIds[0]); } - this.shuffleId = blockId0Parts[1]; + this.shuffleId = Integer.parseInt(blockId0Parts[1]); mapIdAndReduceIds = new int[2 * blockIds.length]; for (int i = 0; i< blockIds.length; i++) { String[] blockIdParts = blockIds[i].split("_"); @@ -229,9 +229,8 @@ public boolean hasNext() { @Override public ManagedBuffer next() { - String blockId = "shuffle_" + shuffleId + "_" + mapIdAndReduceIds[2 * index] + "_" + - mapIdAndReduceIds[2 * index + 1]; - final ManagedBuffer block = blockManager.getBlockData(appId, execId, blockId); + final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId, + mapIdAndReduceIds[2 * index], mapIdAndReduceIds[2 * index + 1]); index++; metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); return block; diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 62d58aba4c1e..bd1ad0557e04 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -151,8 +151,7 @@ public void registerExecutor( /** * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the - * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make - * assumptions about how the hash and sort based shuffles store their data. + * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId). */ public ManagedBuffer getBlockData(String appId, String execId, String blockId) { String[] blockIdParts = blockId.split("_"); @@ -164,7 +163,15 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) { int shuffleId = Integer.parseInt(blockIdParts[1]); int mapId = Integer.parseInt(blockIdParts[2]); int reduceId = Integer.parseInt(blockIdParts[3]); + return getBlockData(appId, execId, shuffleId, mapId, reduceId); + } + /** + * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions + * about how the hash and sort based shuffles store their data. + */ + public ManagedBuffer getBlockData(String appId, String execId, int shuffleId, int mapId, + int reduceId) { ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); if (executor == null) { throw new RuntimeException( From a2af617e20a52fba8e8c65290fae7857eac34375 Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 14 Jun 2017 08:23:15 +0800 Subject: [PATCH 07/10] remove getBlockData(String appId, String execId, String blockId) and fix bug. --- .../shuffle/ExternalShuffleBlockHandler.java | 2 +- .../shuffle/ExternalShuffleBlockResolver.java | 26 +++++-------------- .../ExternalShuffleBlockHandlerSuite.java | 8 +++--- .../ExternalShuffleBlockResolverSuite.java | 10 +++---- 4 files changed, 16 insertions(+), 30 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 58e90dd18f33..d5583031f56e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -213,7 +213,7 @@ private class ManagedBufferIterator implements Iterator { mapIdAndReduceIds = new int[2 * blockIds.length]; for (int i = 0; i< blockIds.length; i++) { String[] blockIdParts = blockIds[i].split("_"); - if (!blockIdParts[1].equals(shuffleId)) { + if (Integer.parseInt(blockIdParts[1]) != shuffleId) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + ", got:" + blockIds[i]); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index bd1ad0557e04..d7ec0e299dea 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -149,35 +149,21 @@ public void registerExecutor( executors.put(fullId, executorInfo); } - /** - * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the - * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId). - */ - public ManagedBuffer getBlockData(String appId, String execId, String blockId) { - String[] blockIdParts = blockId.split("_"); - if (blockIdParts.length < 4) { - throw new IllegalArgumentException("Unexpected block id format: " + blockId); - } else if (!blockIdParts[0].equals("shuffle")) { - throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId); - } - int shuffleId = Integer.parseInt(blockIdParts[1]); - int mapId = Integer.parseInt(blockIdParts[2]); - int reduceId = Integer.parseInt(blockIdParts[3]); - return getBlockData(appId, execId, shuffleId, mapId, reduceId); - } - /** * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions * about how the hash and sort based shuffles store their data. */ - public ManagedBuffer getBlockData(String appId, String execId, int shuffleId, int mapId, - int reduceId) { + public ManagedBuffer getBlockData( + String appId, + String execId, + int shuffleId, + int mapId, + int reduceId) { ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); if (executor == null) { throw new RuntimeException( String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); } - return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java index d258dc95c121..7846b71d5a8b 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java @@ -83,8 +83,8 @@ public void testOpenShuffleBlocks() { ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3])); ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); - when(blockResolver.getBlockData("app0", "exec1", "shuffle_0_0_0")).thenReturn(block0Marker); - when(blockResolver.getBlockData("app0", "exec1", "shuffle_0_0_1")).thenReturn(block1Marker); + when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0)).thenReturn(block0Marker); + when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(block1Marker); ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "shuffle_0_0_0", "shuffle_0_0_1" }) .toByteBuffer(); @@ -106,8 +106,8 @@ public void testOpenShuffleBlocks() { assertEquals(block0Marker, buffers.next()); assertEquals(block1Marker, buffers.next()); assertFalse(buffers.hasNext()); - verify(blockResolver, times(1)).getBlockData("app0", "exec1", "shuffle_0_0_0"); - verify(blockResolver, times(1)).getBlockData("app0", "exec1", "shuffle_0_0_1"); + verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0); + verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1); // Verify open block request latency metrics Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler) diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index bc97594903be..23438a08fa09 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -65,7 +65,7 @@ public void testBadRequests() throws IOException { ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); // Unregistered executor try { - resolver.getBlockData("app0", "exec1", "shuffle_1_1_0"); + resolver.getBlockData("app0", "exec1", 1, 1, 0); fail("Should have failed"); } catch (RuntimeException e) { assertTrue("Bad error message: " + e, e.getMessage().contains("not registered")); @@ -74,7 +74,7 @@ public void testBadRequests() throws IOException { // Invalid shuffle manager try { resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar")); - resolver.getBlockData("app0", "exec2", "shuffle_1_1_0"); + resolver.getBlockData("app0", "exec2", 1, 1, 0); fail("Should have failed"); } catch (UnsupportedOperationException e) { // pass @@ -84,7 +84,7 @@ public void testBadRequests() throws IOException { resolver.registerExecutor("app0", "exec3", dataContext.createExecutorInfo(SORT_MANAGER)); try { - resolver.getBlockData("app0", "exec3", "shuffle_1_1_0"); + resolver.getBlockData("app0", "exec3", 1, 1, 0); fail("Should have failed"); } catch (Exception e) { // pass @@ -98,14 +98,14 @@ public void testSortShuffleBlocks() throws IOException { dataContext.createExecutorInfo(SORT_MANAGER)); InputStream block0Stream = - resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream(); + resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream(); String block0 = CharStreams.toString( new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); block0Stream.close(); assertEquals(sortBlock0, block0); InputStream block1Stream = - resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream(); + resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream(); String block1 = CharStreams.toString( new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); block1Stream.close(); From 6677bc9164ca3c04988fab943e0ce0f0bbed5b10 Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 15 Jun 2017 10:27:17 +0800 Subject: [PATCH 08/10] fix --- .../spark/network/shuffle/ExternalShuffleBlockHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index d5583031f56e..3678c390de6f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -211,7 +211,7 @@ private class ManagedBufferIterator implements Iterator { } this.shuffleId = Integer.parseInt(blockId0Parts[1]); mapIdAndReduceIds = new int[2 * blockIds.length]; - for (int i = 0; i< blockIds.length; i++) { + for (int i = 0; i < blockIds.length; i++) { String[] blockIdParts = blockIds[i].split("_"); if (Integer.parseInt(blockIdParts[1]) != shuffleId) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + From 2592ef40e16382e80072b4d51273120443aef3fa Mon Sep 17 00:00:00 2001 From: jinxing Date: Fri, 16 Jun 2017 13:08:08 +0800 Subject: [PATCH 09/10] resolve cloud-fan's comments --- .../shuffle/ExternalShuffleBlockHandler.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 3678c390de6f..c7bd18b4d492 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -203,16 +203,16 @@ private class ManagedBufferIterator implements Iterator { this.appId = appId; this.execId = execId; String[] blockId0Parts = blockIds[0].split("_"); - if (blockId0Parts.length < 4) { - throw new IllegalArgumentException("Unexpected block id format: " + blockIds[0]); - } - if (!blockId0Parts[0].equals("shuffle")) { - throw new IllegalArgumentException("Expected shuffle block id, got: " + blockIds[0]); + if (blockId0Parts.length < 4 || !blockId0Parts[0].equals("shuffle")) { + throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]); } this.shuffleId = Integer.parseInt(blockId0Parts[1]); mapIdAndReduceIds = new int[2 * blockIds.length]; for (int i = 0; i < blockIds.length; i++) { String[] blockIdParts = blockIds[i].split("_"); + if (blockIdParts.length < 4 || !blockIdParts[0].equals("shuffle")) { + throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]); + } if (Integer.parseInt(blockIdParts[1]) != shuffleId) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + ", got:" + blockIds[i]); @@ -224,14 +224,14 @@ private class ManagedBufferIterator implements Iterator { @Override public boolean hasNext() { - return index < mapIdAndReduceIds.length / 2; + return index < mapIdAndReduceIds.length; } @Override public ManagedBuffer next() { final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId, - mapIdAndReduceIds[2 * index], mapIdAndReduceIds[2 * index + 1]); - index++; + mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]); + index += 2; metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); return block; } From 5b0ce674fb3070c6749f9caf8cbbbeabb702ce01 Mon Sep 17 00:00:00 2001 From: jinxing Date: Fri, 16 Jun 2017 13:38:14 +0800 Subject: [PATCH 10/10] blockId0Parts.length != 4 --- .../spark/network/shuffle/ExternalShuffleBlockHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index c7bd18b4d492..fc7bba41185f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -203,14 +203,14 @@ private class ManagedBufferIterator implements Iterator { this.appId = appId; this.execId = execId; String[] blockId0Parts = blockIds[0].split("_"); - if (blockId0Parts.length < 4 || !blockId0Parts[0].equals("shuffle")) { + if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) { throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]); } this.shuffleId = Integer.parseInt(blockId0Parts[1]); mapIdAndReduceIds = new int[2 * blockIds.length]; for (int i = 0; i < blockIds.length; i++) { String[] blockIdParts = blockIds[i].split("_"); - if (blockIdParts.length < 4 || !blockIdParts[0].equals("shuffle")) { + if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) { throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]); } if (Integer.parseInt(blockIdParts[1]) != shuffleId) {