From a3dcbd611b0d06f06431e4128c96bcfacf9c3866 Mon Sep 17 00:00:00 2001 From: atsaonerk Date: Wed, 15 Feb 2023 19:21:14 +0530 Subject: [PATCH 1/4] Expose distcp counters to user via new DistCpConstants "CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED". The constant indicate number of bytes copied by distcp operation. It is exposed via configuration parameter through which user can obtain the value. --- .../src/main/java/org/apache/hadoop/tools/DistCp.java | 8 ++++++++ .../java/org/apache/hadoop/tools/DistCpConstants.java | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index 141f45d61f372..58bf1d1639c45 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -171,6 +171,14 @@ public int run(String[] argv) { } finally { //Blocking distcp so close the job after its done if (job != null && context.shouldBlock()) { + try { + Long bytesCopied = job.getCounters().findCounter(CopyMapper.Counter.BYTESCOPIED).getValue(); + LOG.info("distcp copied {} number of bytes", bytesCopied); + //Set the config label so that consumer such as hive can see this distcp counter + getConf().set(DistCpConstants.CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED, String.valueOf(bytesCopied)); + } catch (IOException e) { + LOG.error("Exception encountered while retrieving distcp counter from job", e); + } try { job.close(); } catch (IOException e) { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 6838d4f775753..5ea75747c6ae5 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -116,6 +116,11 @@ private DistCpConstants() { */ public static final String CONF_LABEL_DISTCP_JOB_ID = "distcp.job.id"; + /** + * DistCp Counter for consumers of the Distcp + */ + public static final String CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED = "distcp.total.bytes.copied"; + /* Meta folder where the job's intermediate data is kept */ public static final String CONF_LABEL_META_FOLDER = "distcp.meta.folder"; From 2bce2f6885de6486f50e21fab63479fb77308225 Mon Sep 17 00:00:00 2001 From: atsaonerk Date: Thu, 16 Feb 2023 10:49:28 +0530 Subject: [PATCH 2/4] Add testcase to test bytes copied config is actually returned to user after distcp is run. --- .../java/org/apache/hadoop/tools/TestDistCpSystem.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java index 64c6800f9446a..860316769c4fa 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java @@ -19,6 +19,7 @@ package org.apache.hadoop.tools; import static org.apache.hadoop.test.GenericTestUtils.getMethodName; +import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -32,6 +33,7 @@ import java.util.List; import java.util.Random; +import org.apache.commons.lang3.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -452,7 +454,10 @@ public void testDistcpLargeFile() throws Exception { }; LOG.info("_____ running distcp: " + args[0] + " " + args[1]); - ToolRunner.run(conf, new DistCp(), args); + DistCp distcpTool = new DistCp(); + ToolRunner.run(conf, distcpTool, args); + final long bytesCopied = NumberUtils.toLong(distcpTool.getConf().get(CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED), 0); + assertEquals("Bytes copied by distcp tool should match source file length", bytesCopied, srcLen); String realTgtPath = testDst; FileStatus[] dststat = getFileStatus(fs, realTgtPath, srcfiles); From 3b4574246a6cbf9d3f04c7a15aa1fd5f7f7fe61c Mon Sep 17 00:00:00 2001 From: atsaonerk Date: Fri, 17 Feb 2023 09:59:41 +0530 Subject: [PATCH 3/4] Corrected code validation failures. --- .../src/main/java/org/apache/hadoop/tools/DistCp.java | 6 ++++-- .../main/java/org/apache/hadoop/tools/DistCpConstants.java | 2 +- .../test/java/org/apache/hadoop/tools/TestDistCpSystem.java | 6 ++++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index 58bf1d1639c45..0a5ddfbf8354a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -172,10 +172,12 @@ public int run(String[] argv) { //Blocking distcp so close the job after its done if (job != null && context.shouldBlock()) { try { - Long bytesCopied = job.getCounters().findCounter(CopyMapper.Counter.BYTESCOPIED).getValue(); + Long bytesCopied = job.getCounters(). + findCounter(CopyMapper.Counter.BYTESCOPIED).getValue(); LOG.info("distcp copied {} number of bytes", bytesCopied); //Set the config label so that consumer such as hive can see this distcp counter - getConf().set(DistCpConstants.CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED, String.valueOf(bytesCopied)); + getConf().set(DistCpConstants.CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED, + String.valueOf(bytesCopied)); } catch (IOException e) { LOG.error("Exception encountered while retrieving distcp counter from job", e); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 5ea75747c6ae5..ebcb31e26d311 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -117,7 +117,7 @@ private DistCpConstants() { public static final String CONF_LABEL_DISTCP_JOB_ID = "distcp.job.id"; /** - * DistCp Counter for consumers of the Distcp + * DistCp Counter for consumers of the Distcp. */ public static final String CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED = "distcp.total.bytes.copied"; diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java index 860316769c4fa..b699cc61ed536 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java @@ -456,8 +456,10 @@ public void testDistcpLargeFile() throws Exception { LOG.info("_____ running distcp: " + args[0] + " " + args[1]); DistCp distcpTool = new DistCp(); ToolRunner.run(conf, distcpTool, args); - final long bytesCopied = NumberUtils.toLong(distcpTool.getConf().get(CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED), 0); - assertEquals("Bytes copied by distcp tool should match source file length", bytesCopied, srcLen); + final long bytesCopied = NumberUtils.toLong(distcpTool.getConf(). + get(CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED), 0); + assertEquals("Bytes copied by distcp tool should match source file length", + bytesCopied, srcLen); String realTgtPath = testDst; FileStatus[] dststat = getFileStatus(fs, realTgtPath, srcfiles); From 15669c18810037abf5204f0fd868e23d253532c8 Mon Sep 17 00:00:00 2001 From: atsaonerk Date: Thu, 23 Feb 2023 11:52:27 +0530 Subject: [PATCH 4/4] Fixed TestExternalCall.testcleanupOfJob failure. Corrected order of assert args in TestDistCpSystem.testDistcpLargeFile test. --- .../java/org/apache/hadoop/tools/DistCp.java | 20 +++++++++---------- .../apache/hadoop/tools/TestDistCpSystem.java | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index 0a5ddfbf8354a..d86160e89bb23 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -153,6 +153,14 @@ public int run(String[] argv) { Job job = null; try { job = execute(); + if (job != null) { + Long bytesCopied = job.getCounters(). + findCounter(CopyMapper.Counter.BYTESCOPIED).getValue(); + LOG.info("distcp copied {} number of bytes", bytesCopied); + //Set the config label so that consumer such as hive can see this distcp counter + getConf().set(DistCpConstants.CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED, + String.valueOf(bytesCopied)); + } } catch (InvalidInputException e) { LOG.error("Invalid input: ", e); return DistCpConstants.INVALID_ARGUMENT; @@ -165,22 +173,14 @@ public int run(String[] argv) { } catch (XAttrsNotSupportedException e) { LOG.error("XAttrs not supported on at least one file system: ", e); return DistCpConstants.XATTRS_NOT_SUPPORTED; + } catch (IOException e) { + LOG.error("Exception encountered while retrieving distcp counter from job", e); } catch (Exception e) { LOG.error("Exception encountered ", e); return DistCpConstants.UNKNOWN_ERROR; } finally { //Blocking distcp so close the job after its done if (job != null && context.shouldBlock()) { - try { - Long bytesCopied = job.getCounters(). - findCounter(CopyMapper.Counter.BYTESCOPIED).getValue(); - LOG.info("distcp copied {} number of bytes", bytesCopied); - //Set the config label so that consumer such as hive can see this distcp counter - getConf().set(DistCpConstants.CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED, - String.valueOf(bytesCopied)); - } catch (IOException e) { - LOG.error("Exception encountered while retrieving distcp counter from job", e); - } try { job.close(); } catch (IOException e) { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java index b699cc61ed536..caabf295301de 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java @@ -458,8 +458,8 @@ public void testDistcpLargeFile() throws Exception { ToolRunner.run(conf, distcpTool, args); final long bytesCopied = NumberUtils.toLong(distcpTool.getConf(). get(CONF_LABEL_DISTCP_TOTAL_BYTES_COPIED), 0); - assertEquals("Bytes copied by distcp tool should match source file length", - bytesCopied, srcLen); + assertEquals("Bytes copied by distcp tool does not match source file length", + srcLen, bytesCopied); String realTgtPath = testDst; FileStatus[] dststat = getFileStatus(fs, realTgtPath, srcfiles);