Skip to content

Commit

Permalink
HBASE-24859: Optimize in-memory representation of HBase map reduce ta…
Browse files Browse the repository at this point in the history
…ble splits (#2610)

Patch fixes the single table input format case.

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
  • Loading branch information
sandeepvinayak committed Nov 3, 2020
1 parent ca45b1b commit 729518a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 24 deletions.
Expand Up @@ -26,10 +26,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -52,6 +48,9 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
Expand Down Expand Up @@ -287,7 +286,7 @@ public List<InputSplit> getSplits(JobContext context) throws IOException {
* Create one InputSplit per region
*
* @return The list of InputSplit for all the regions
* @throws IOException
* @throws IOException throws IOException
*/
private List<InputSplit> oneInputSplitPerRegion() throws IOException {
RegionSizeCalculator sizeCalculator =
Expand All @@ -305,7 +304,10 @@ private List<InputSplit> oneInputSplitPerRegion() throws IOException {
}
List<InputSplit> splits = new ArrayList<>(1);
long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
TableSplit split = new TableSplit(tableName, scan,
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
TableSplit split = new TableSplit(tableName, null,
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
splits.add(split);
Expand Down Expand Up @@ -345,7 +347,10 @@ private List<InputSplit> oneInputSplitPerRegion() throws IOException {
byte[] regionName = location.getRegionInfo().getRegionName();
String encodedRegionName = location.getRegionInfo().getEncodedName();
long regionSize = sizeCalculator.getRegionSize(regionName);
TableSplit split = new TableSplit(tableName, scan,
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
TableSplit split = new TableSplit(tableName, null,
splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
splits.add(split);
if (LOG.isDebugEnabled()) {
Expand All @@ -362,7 +367,7 @@ private List<InputSplit> oneInputSplitPerRegion() throws IOException {
* @param n Number of ranges after splitting. Pass 1 means no split for the range
* Pass 2 if you want to split the range in two;
* @return A list of TableSplit, the size of the list is n
* @throws IllegalArgumentIOException
* @throws IllegalArgumentIOException throws IllegalArgumentIOException
*/
protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n)
throws IllegalArgumentIOException {
Expand Down Expand Up @@ -409,9 +414,12 @@ protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n)
// Split Region into n chunks evenly
byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1);
for (int i = 0; i < splitKeys.length - 1; i++) {
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
//notice that the regionSize parameter may be not very accurate
TableSplit tsplit =
new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation,
new TableSplit(tableName, null, splitKeys[i], splitKeys[i + 1], regionLocation,
encodedRegionName, regionSize / n);
res.add(tsplit);
}
Expand Down Expand Up @@ -488,7 +496,10 @@ public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, lon
}
}
i = j - 1;
TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation,
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
TableSplit t = new TableSplit(tableName, null, splitStartKey, splitEndKey, regionLocation,
encodedRegionName, totalSize);
resultList.add(t);
}
Expand All @@ -508,7 +519,9 @@ String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
// reverse DNS using jndi doesn't work well with ipv6 addresses.
ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
}
if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);
if (ipAddressString == null) {
throw new UnknownHostException("No host found for " + ipAddress);
}
hostName = Strings.domainNamePointerToHostName(ipAddressString);
this.reverseDNSCacheMap.put(ipAddress, hostName);
}
Expand Down
Expand Up @@ -22,25 +22,24 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;

import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A table split corresponds to a key range (low, high) and an optional scanner.
* All references to row below refer to the key of the row.
*/
@InterfaceAudience.Public
public class TableSplit extends InputSplit
implements Writable, Comparable<TableSplit> {
implements Writable, Comparable<TableSplit> {
/** @deprecated LOG variable would be made private. fix in hbase 3.0 */
@Deprecated
public static final Logger LOG = LoggerFactory.getLogger(TableSplit.class);
Expand Down Expand Up @@ -84,6 +83,16 @@ static Version fromCode(int code) {
private byte [] endRow;
private String regionLocation;
private String encodedRegionName = "";

/**
* The scan object may be null but the serialized form of scan is never null
* or empty since we serialize the scan object with default values then.
* Having no scanner in TableSplit doesn't necessarily mean there is no scanner
* for mapreduce job, it just means that we do not need to set it for each split.
* For example, it is not required to have a scan object for
* {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use the scan from the
* job conf and scanner is supposed to be same for all the splits of table.
*/
private String scan = ""; // stores the serialized form of the Scan
private long length; // Contains estimation of region size in bytes

Expand Down Expand Up @@ -182,12 +191,23 @@ public TableSplit(TableName tableName, byte[] startRow, byte[] endRow,
* Returns a Scan object from the stored string representation.
*
* @return Returns a Scan object based on the stored scanner.
* @throws IOException
* @throws IOException throws IOException if deserialization fails
*/
public Scan getScan() throws IOException {
return TableMapReduceUtil.convertStringToScan(this.scan);
}

/**
* Returns a scan string
* @return scan as string. Should be noted that this is not same as getScan().toString()
* because Scan object will have the default values when empty scan string is
* deserialized. Thus, getScan().toString() can never be empty
*/
@InterfaceAudience.Private
public String getScanAsString() {
return this.scan;
}

/**
* Returns the table name converted to a byte array.
* @see #getTable()
Expand Down
Expand Up @@ -21,7 +21,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -86,7 +85,7 @@ public static void tearDownAfterClass() throws Exception {
* Pass the key and value to reduce.
*/
public static class ScanMapper
extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {

/**
* Pass the key and value to reduce.
Expand All @@ -99,7 +98,7 @@ public static class ScanMapper
@Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
throws IOException, InterruptedException {
if (value.size() != 2) {
throw new IOException("There should be two input columns");
}
Expand All @@ -123,15 +122,15 @@ public void map(ImmutableBytesWritable key, Result value,
* Checks the last and first key seen against the scanner boundaries.
*/
public static class ScanReducer
extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
NullWritable, NullWritable> {

private String first = null;
private String last = null;

protected void reduce(ImmutableBytesWritable key,
Iterable<ImmutableBytesWritable> values, Context context)
throws IOException ,InterruptedException {
throws IOException ,InterruptedException {
int count = 0;
for (ImmutableBytesWritable value : values) {
String val = Bytes.toStringBinary(value.get());
Expand All @@ -144,7 +143,7 @@ protected void reduce(ImmutableBytesWritable key,
}

protected void cleanup(Context context)
throws IOException, InterruptedException {
throws IOException, InterruptedException {
Configuration c = context.getConfiguration();
String startRow = c.get(KEY_STARTROW);
String lastRow = c.get(KEY_LASTROW);
Expand Down Expand Up @@ -249,6 +248,12 @@ protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits)
tif.setConf(job.getConfiguration());
Assert.assertEquals(TABLE_NAME, table.getName());
List<InputSplit> splits = tif.getSplits(job);
for (InputSplit split : splits) {
TableSplit tableSplit = (TableSplit) split;
// In table input format, we do no store the scanner at the split level
// because we use the scan object from the map-reduce job conf itself.
Assert.assertTrue(tableSplit.getScanAsString().isEmpty());
}
Assert.assertEquals(expectedNumOfSplits, splits.size());
}

Expand Down

0 comments on commit 729518a

Please sign in to comment.