Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,10 @@ private List<InputSplit> oneInputSplitPerRegion() throws IOException, NamingExce
}
List<InputSplit> splits = new ArrayList<>(1);
long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
TableSplit split = new TableSplit(tableName, scan,
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
TableSplit split = new TableSplit(tableName, null,
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
splits.add(split);
Expand Down Expand Up @@ -363,8 +366,11 @@ private List<InputSplit> oneInputSplitPerRegion() throws IOException, NamingExce
byte[] regionName = location.getRegionInfo().getRegionName();
String encodedRegionName = location.getRegionInfo().getEncodedName();
long regionSize = sizeCalculator.getRegionSize(regionName);
TableSplit split = new TableSplit(tableName, scan,
splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
TableSplit split = new TableSplit(tableName, null, splitStart, splitStop,
regionLocation, encodedRegionName, regionSize);
splits.add(split);
if (LOG.isDebugEnabled()) {
LOG.debug("getSplits: split -> " + i + " -> " + split);
Expand Down Expand Up @@ -427,9 +433,12 @@ protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n)
// Split Region into n chunks evenly
byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1);
for (int i = 0; i < splitKeys.length - 1; i++) {
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
//notice that the regionSize parameter may be not very accurate
TableSplit tsplit =
new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation,
new TableSplit(tableName, null, splitKeys[i], splitKeys[i + 1], regionLocation,
encodedRegionName, regionSize / n);
res.add(tsplit);
}
Expand Down Expand Up @@ -527,7 +536,10 @@ public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, lon
}
}
i = j - 1;
TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation,
// In the table input format for single table, we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
TableSplit t = new TableSplit(tableName, null, splitStartKey, splitEndKey, regionLocation,
encodedRegionName, totalSize);
resultList.add(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
Expand All @@ -42,7 +42,7 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class TableSplit extends InputSplit
implements Writable, Comparable<TableSplit> {
implements Writable, Comparable<TableSplit> {
/** @deprecated LOG variable would be made private. */
@Deprecated
public static final Log LOG = LogFactory.getLog(TableSplit.class);
Expand Down Expand Up @@ -86,7 +86,18 @@ static Version fromCode(int code) {
private byte [] endRow;
private String regionLocation;
private String encodedRegionName = "";

/**
* The scan object may be null but the serialized form of scan is never null
* or empty since we serialize the scan object with default values then.
* Having no scanner in TableSplit doesn't necessarily mean there is no scanner
* for mapreduce job, it just means that we do not need to set it for each split.
* For example, it is not required to have a scan object for
* {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use the scan from the
* job conf and scanner is supposed to be same for all the splits of table.
*/
private String scan = ""; // stores the serialized form of the Scan

private long length; // Contains estimation of region size in bytes

/** Default constructor. */
Expand Down Expand Up @@ -208,12 +219,23 @@ public TableSplit(TableName tableName, byte[] startRow, byte[] endRow,
* Returns a Scan object from the stored string representation.
*
* @return Returns a Scan object based on the stored scanner.
* @throws IOException
* @throws IOException throws IOException if deserialization fails
*/
public Scan getScan() throws IOException {
return TableMapReduceUtil.convertStringToScan(this.scan);
}

/**
* Returns a scan string
* @return scan as string. Should be noted that this is not same as getScan().toString()
* because Scan object will have the default values when empty scan string is
* deserialized. Thus, getScan().toString() can never be empty
*/
@InterfaceAudience.Private
public String getScanAsString() {
Comment thread
sandeepvinayak marked this conversation as resolved.
return this.scan;
}

/**
* Returns the table name converted to a byte array.
* @see #getTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NavigableMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -37,7 +41,6 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
Expand Down Expand Up @@ -271,6 +274,12 @@ public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws
TableInputFormat tif = new TableInputFormat();
tif.setConf(job.getConfiguration());
List<InputSplit> splits = tif.getSplits(job);
for (InputSplit split : splits) {
TableSplit tableSplit = (TableSplit) split;
// In table input format, we do no store the scanner at the split level
// because we use the scan object from the map-reduce job conf itself.
Assert.assertTrue(tableSplit.getScanAsString().isEmpty());
}
Assert.assertEquals(expectedNumOfSplits, splits.size());
}

Expand Down