From 688aaa58d14ad61b2a2a54d90f35f508686c9324 Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Tue, 27 Oct 2020 17:03:39 -0700 Subject: [PATCH 1/4] HBASE-24859: Improve the storage cost for HBase map reduce table splits --- .../hbase/mapreduce/TableInputFormatBase.java | 9 ++++++--- .../hadoop/hbase/mapreduce/TableSplit.java | 17 ++++++++++++++++- .../mapreduce/TestTableInputFormatScanBase.java | 6 ++++++ 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index c5c00cecc816..c06e0e37fc7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -323,7 +323,7 @@ private List oneInputSplitPerRegion() throws IOException, NamingExce } List splits = new ArrayList<>(1); long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); - TableSplit split = new TableSplit(tableName, scan, + TableSplit split = new TableSplit(tableName, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); splits.add(split); @@ -363,8 +363,11 @@ private List oneInputSplitPerRegion() throws IOException, NamingExce byte[] regionName = location.getRegionInfo().getRegionName(); String encodedRegionName = location.getRegionInfo().getEncodedName(); long regionSize = sizeCalculator.getRegionSize(regionName); - TableSplit split = new TableSplit(tableName, scan, - splitStart, splitStop, regionLocation, encodedRegionName, regionSize); + // In the table format input for single table we do not need to + // store the scan object because it is a duplicate information to + // what is already store in conf SCAN + TableSplit split = new TableSplit(tableName, splitStart, splitStop, + regionLocation, encodedRegionName, regionSize); splits.add(split); if (LOG.isDebugEnabled()) { LOG.debug("getSplits: split -> " + i + " -> " + split); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java index 4cbe217d6158..9627be27abc1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java @@ -184,9 +184,24 @@ public TableSplit(final byte [] tableName, byte[] startRow, byte[] endRow, * @param startRow The start row of the split. * @param endRow The end row of the split. * @param location The location of the region. + * @param encodedRegionName The region ID. + * @param length Size of region in bytes */ public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, - final String location) { + final String location, final String encodedRegionName, long length) { + this(tableName, null, startRow, endRow, location, encodedRegionName, length); + } + + /** + * Creates a new instance without a scanner. + * + * @param tableName The name of the current table. + * @param startRow The start row of the split. + * @param endRow The end row of the split. + * @param location The location of the region. + */ + public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, + final String location) { this(tableName, null, startRow, endRow, location); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java index b0ae937749c0..965ba11d70bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; @@ -271,6 +272,11 @@ public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws TableInputFormat tif = new TableInputFormat(); tif.setConf(job.getConfiguration()); List splits = tif.getSplits(job); + for (InputSplit split : splits) { + TableSplit tableSplit = (TableSplit) split; + Assert.assertEquals(tableSplit.getScan().getStartRow(), HConstants.EMPTY_START_ROW); + Assert.assertEquals(tableSplit.getScan().getStopRow(), HConstants.EMPTY_END_ROW); + } Assert.assertEquals(expectedNumOfSplits, splits.size()); } From 7fef633035b3105a7ffe99f70577cbd45247234c Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Wed, 28 Oct 2020 16:57:03 -0700 Subject: [PATCH 2/4] Adding javadoc for scan object and comment on asserts a --- .../java/org/apache/hadoop/hbase/mapreduce/TableSplit.java | 6 +++++- .../hbase/mapreduce/TestTableInputFormatScanBase.java | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java index 9627be27abc1..1b1eac658b32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java @@ -193,7 +193,11 @@ public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, } /** - * Creates a new instance without a scanner. + * Creates a new instance without a scanner. Having no scanner in TableSplit doesn't necessarily + * mean there is not scanner for map reduce job, it just means that we do not need to set + * it for each split. For example, it is not required to have a scan object for + * {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use + * the scan from the job conf and scanner is supposed to be same for all the splits of table. * * @param tableName The name of the current table. * @param startRow The start row of the split. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java index 965ba11d70bd..2618a10eddfb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -274,6 +274,9 @@ public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws List splits = tif.getSplits(job); for (InputSplit split : splits) { TableSplit tableSplit = (TableSplit) split; + // We should have the null scan object in the TableSplit, but when we serialize + // null scan in the TableSplit, we put the default values instead. + // So, asserting here for default values. Assert.assertEquals(tableSplit.getScan().getStartRow(), HConstants.EMPTY_START_ROW); Assert.assertEquals(tableSplit.getScan().getStopRow(), HConstants.EMPTY_END_ROW); } From 732e0dddf789099bca9abe7188f812090ff0217e Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Thu, 29 Oct 2020 12:59:38 -0700 Subject: [PATCH 3/4] Removing the constructor and testing on the scan string rather than scan object --- .../hbase/mapreduce/TableInputFormatBase.java | 6 +-- .../hadoop/hbase/mapreduce/TableSplit.java | 38 +++++++++---------- .../TestTableInputFormatScanBase.java | 1 + 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index c06e0e37fc7a..0f49af571f99 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -323,7 +323,7 @@ private List oneInputSplitPerRegion() throws IOException, NamingExce } List splits = new ArrayList<>(1); long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); - TableSplit split = new TableSplit(tableName, + TableSplit split = new TableSplit(tableName, null, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); splits.add(split); @@ -366,7 +366,7 @@ private List oneInputSplitPerRegion() throws IOException, NamingExce // In the table format input for single table we do not need to // store the scan object because it is a duplicate information to // what is already store in conf SCAN - TableSplit split = new TableSplit(tableName, splitStart, splitStop, + TableSplit split = new TableSplit(tableName, null, splitStart, splitStop, regionLocation, encodedRegionName, regionSize); splits.add(split); if (LOG.isDebugEnabled()) { @@ -432,7 +432,7 @@ protected List createNInputSplitsUniform(InputSplit split, int n) for (int i = 0; i < splitKeys.length - 1; i++) { //notice that the regionSize parameter may be not very accurate TableSplit tsplit = - new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation, + new TableSplit(tableName, null, splitKeys[i], splitKeys[i + 1], regionLocation, encodedRegionName, regionSize / n); res.add(tsplit); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java index 1b1eac658b32..1fb0c094e75c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java @@ -86,7 +86,16 @@ static Version fromCode(int code) { private byte [] endRow; private String regionLocation; private String encodedRegionName = ""; + + /** The scan object may be null but the serialized form of scan is never null + * or empty since we serialize the scan object with default values then. + * Having no scanner in TableSplit doesn't necessarily mean there is not scanner + * for mapreduce job, it just means that we do not need to set it for each split. + * For example, it is not required to have a scan object for + * {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use the scan from the + * job conf and scanner is supposed to be same for all the splits of table.*/ private String scan = ""; // stores the serialized form of the Scan + private long length; // Contains estimation of region size in bytes /** Default constructor. */ @@ -184,28 +193,9 @@ public TableSplit(final byte [] tableName, byte[] startRow, byte[] endRow, * @param startRow The start row of the split. * @param endRow The end row of the split. * @param location The location of the region. - * @param encodedRegionName The region ID. - * @param length Size of region in bytes */ public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, - final String location, final String encodedRegionName, long length) { - this(tableName, null, startRow, endRow, location, encodedRegionName, length); - } - - /** - * Creates a new instance without a scanner. Having no scanner in TableSplit doesn't necessarily - * mean there is not scanner for map reduce job, it just means that we do not need to set - * it for each split. For example, it is not required to have a scan object for - * {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use - * the scan from the job conf and scanner is supposed to be same for all the splits of table. - * - * @param tableName The name of the current table. - * @param startRow The start row of the split. - * @param endRow The end row of the split. - * @param location The location of the region. - */ - public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, - final String location) { + final String location) { this(tableName, null, startRow, endRow, location); } @@ -233,6 +223,14 @@ public Scan getScan() throws IOException { return TableMapReduceUtil.convertStringToScan(this.scan); } + /** + * Returns a scan object in the serialized form + * @return a serialized scan object + */ + public String getScanAsString() { + return this.scan; + } + /** * Returns the table name converted to a byte array. * @see #getTable() diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java index 2618a10eddfb..fbc8fc5aebd3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -279,6 +279,7 @@ public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws // So, asserting here for default values. Assert.assertEquals(tableSplit.getScan().getStartRow(), HConstants.EMPTY_START_ROW); Assert.assertEquals(tableSplit.getScan().getStopRow(), HConstants.EMPTY_END_ROW); + Assert.assertTrue(tableSplit.getScanAsString().isEmpty()); } Assert.assertEquals(expectedNumOfSplits, splits.size()); } From 244e482a5dd663258cec4cfe515ec5dcd97902c3 Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Thu, 29 Oct 2020 16:04:32 -0700 Subject: [PATCH 4/4] Addressing comments and fixing come checkstyles --- .../hbase/mapreduce/TableInputFormatBase.java | 17 ++++++++++---- .../hadoop/hbase/mapreduce/TableSplit.java | 23 +++++++++++-------- .../TestTableInputFormatScanBase.java | 15 ++++++------ 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 0f49af571f99..5151840dad52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -323,6 +323,9 @@ private List oneInputSplitPerRegion() throws IOException, NamingExce } List splits = new ArrayList<>(1); long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); + // In the table input format for single table we do not need to + // store the scan object in table split because it can be memory intensive and redundant + // information to what is already stored in conf SCAN. See HBASE-25212 TableSplit split = new TableSplit(tableName, null, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); @@ -363,9 +366,9 @@ private List oneInputSplitPerRegion() throws IOException, NamingExce byte[] regionName = location.getRegionInfo().getRegionName(); String encodedRegionName = location.getRegionInfo().getEncodedName(); long regionSize = sizeCalculator.getRegionSize(regionName); - // In the table format input for single table we do not need to - // store the scan object because it is a duplicate information to - // what is already store in conf SCAN + // In the table input format for single table we do not need to + // store the scan object in table split because it can be memory intensive and redundant + // information to what is already stored in conf SCAN. See HBASE-25212 TableSplit split = new TableSplit(tableName, null, splitStart, splitStop, regionLocation, encodedRegionName, regionSize); splits.add(split); @@ -430,6 +433,9 @@ protected List createNInputSplitsUniform(InputSplit split, int n) // Split Region into n chunks evenly byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1); for (int i = 0; i < splitKeys.length - 1; i++) { + // In the table input format for single table we do not need to + // store the scan object in table split because it can be memory intensive and redundant + // information to what is already stored in conf SCAN. See HBASE-25212 //notice that the regionSize parameter may be not very accurate TableSplit tsplit = new TableSplit(tableName, null, splitKeys[i], splitKeys[i + 1], regionLocation, @@ -530,7 +536,10 @@ public List calculateAutoBalancedSplits(List splits, lon } } i = j - 1; - TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation, + // In the table input format for single table, we do not need to + // store the scan object in table split because it can be memory intensive and redundant + // information to what is already stored in conf SCAN. See HBASE-25212 + TableSplit t = new TableSplit(tableName, null, splitStartKey, splitEndKey, regionLocation, encodedRegionName, totalSize); resultList.add(t); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java index 1fb0c094e75c..3d7e72e74bbc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java @@ -25,10 +25,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -42,7 +42,7 @@ @InterfaceAudience.Public @InterfaceStability.Evolving public class TableSplit extends InputSplit -implements Writable, Comparable { + implements Writable, Comparable { /** @deprecated LOG variable would be made private. */ @Deprecated public static final Log LOG = LogFactory.getLog(TableSplit.class); @@ -87,13 +87,15 @@ static Version fromCode(int code) { private String regionLocation; private String encodedRegionName = ""; - /** The scan object may be null but the serialized form of scan is never null + /** + * The scan object may be null but the serialized form of scan is never null * or empty since we serialize the scan object with default values then. - * Having no scanner in TableSplit doesn't necessarily mean there is not scanner + * Having no scanner in TableSplit doesn't necessarily mean there is no scanner * for mapreduce job, it just means that we do not need to set it for each split. * For example, it is not required to have a scan object for * {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use the scan from the - * job conf and scanner is supposed to be same for all the splits of table.*/ + * job conf and scanner is supposed to be same for all the splits of table. + */ private String scan = ""; // stores the serialized form of the Scan private long length; // Contains estimation of region size in bytes @@ -217,16 +219,19 @@ public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, * Returns a Scan object from the stored string representation. * * @return Returns a Scan object based on the stored scanner. - * @throws IOException + * @throws IOException throws IOException if deserialization fails */ public Scan getScan() throws IOException { return TableMapReduceUtil.convertStringToScan(this.scan); } /** - * Returns a scan object in the serialized form - * @return a serialized scan object + * Returns a scan string + * @return scan as string. Should be noted that this is not same as getScan().toString() + * because Scan object will have the default values when empty scan string is + * deserialized. Thus, getScan().toString() can never be empty */ + @InterfaceAudience.Private public String getScanAsString() { return this.scan; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java index fbc8fc5aebd3..dd06af098824 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -23,14 +23,17 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.NavigableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; @@ -38,7 +41,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; @@ -274,11 +276,8 @@ public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws List splits = tif.getSplits(job); for (InputSplit split : splits) { TableSplit tableSplit = (TableSplit) split; - // We should have the null scan object in the TableSplit, but when we serialize - // null scan in the TableSplit, we put the default values instead. - // So, asserting here for default values. - Assert.assertEquals(tableSplit.getScan().getStartRow(), HConstants.EMPTY_START_ROW); - Assert.assertEquals(tableSplit.getScan().getStopRow(), HConstants.EMPTY_END_ROW); + // In table input format, we do no store the scanner at the split level + // because we use the scan object from the map-reduce job conf itself. Assert.assertTrue(tableSplit.getScanAsString().isEmpty()); } Assert.assertEquals(expectedNumOfSplits, splits.size());