Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8862] [HBase] Support HBase snapshot read #5639

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,30 +18,30 @@

package org.apache.flink.addons.hbase;

import org.apache.flink.addons.hbase.strategy.TableInputSplitStrategy;
import org.apache.flink.addons.hbase.util.HBaseConnectorUtil;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.util.Preconditions;

import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Abstract {@link InputFormat} to read data from HBase tables.
*/
public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {
public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit>
implements HBaseTableScannerAware {

protected static final Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);

Expand All @@ -58,20 +58,10 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
protected long scannedRows;

/**
* Returns an instance of Scan that retrieves the required subset of records from the HBase table.
*
* @return The appropriate instance of Scan for this use case.
* By providing {@link #tableInputSplitStrategy} as a specific implementation, the subclass
* can perform separately for HTable scan and HTable snapshot scan.
*/
protected abstract Scan getScanner();

/**
* What table is to be read.
*
* <p>Per instance of a TableInputFormat derivative only a single table name is possible.
*
* @return The name of the table
*/
protected abstract String getTableName();
protected TableInputSplitStrategy tableInputSplitStrategy;

/**
* HBase returns an instance of {@link Result}.
Expand All @@ -84,17 +74,26 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
protected abstract T mapResultToOutType(Result r);

/**
* Creates a {@link Scan} object and opens the {@link HTable} connection.
*
* <p>These are opened here because they are needed in the createInputSplits
* which is called before the openInputFormat method.
* Configuration is left to subclass to perform, such as creating
* a {@link Scan} object and opens the {@link HTable} connection.
*
* <p>The connection is opened in this method and closed in {@link #closeInputFormat()}.
* <p>Table InputSplit Strategy is configured.
*
* @param parameters The configuration that is to be used
* @see Configuration
*/
public abstract void configure(Configuration parameters);
@Override
public void configure(Configuration parameters) {
try {
Preconditions.checkNotNull(table, "Table should be initiated first by " +
"overriding configure() method.");
scan = getScanner();
tableInputSplitStrategy.configure(table, scan);
} catch (IOException e) {
throw new RuntimeException("Configure table input split strategy failed due " +
"to " + e.getMessage(), e);
}
}

@Override
public void open(TableInputSplit split) throws IOException {
Expand All @@ -110,18 +109,19 @@ public void open(TableInputSplit split) throws IOException {
throw new IOException("Input split is null!");
}

logSplitInfo("opening", split);
HBaseConnectorUtil.logSplitInfo("opening", split, this.getClass().getName());

// set scan range
currentRow = split.getStartRow();
scan.setStartRow(currentRow);
scan.setStopRow(split.getEndRow());

resultScanner = table.getScanner(scan);
resultScanner = tableInputSplitStrategy.createResultScanner(table.getConfiguration(), split);
endReached = false;
scannedRows = 0;
}

@Override
public T nextRecord(T reuse) throws IOException {
if (resultScanner == null) {
throw new IOException("No table result scanner provided!");
Expand Down Expand Up @@ -151,16 +151,6 @@ public T nextRecord(T reuse) throws IOException {
return null;
}

private void logSplitInfo(String action, TableInputSplit split) {
int splitId = split.getSplitNumber();
String splitStart = Bytes.toString(split.getStartRow());
String splitEnd = Bytes.toString(split.getEndRow());
String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
String[] hostnames = split.getHostnames();
LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
}

@Override
public boolean reachedEnd() throws IOException {
return endReached;
Expand Down Expand Up @@ -191,67 +181,8 @@ public void closeInputFormat() throws IOException {
}

@Override
public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
if (table == null) {
throw new IOException("The HBase table has not been opened! " +
"This needs to be done in configure().");
}
if (scan == null) {
throw new IOException("Scan has not been initialized! " +
"This needs to be done in configure().");
}

// Get the starting and ending row keys for every region in the currently open table
final Pair<byte[][], byte[][]> keys = table.getRegionLocator().getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new IOException("Expecting at least one region.");
}
final byte[] startRow = scan.getStartRow();
final byte[] stopRow = scan.getStopRow();
final boolean scanWithNoLowerBound = startRow.length == 0;
final boolean scanWithNoUpperBound = stopRow.length == 0;

final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(minNumSplits);
for (int i = 0; i < keys.getFirst().length; i++) {
final byte[] startKey = keys.getFirst()[i];
final byte[] endKey = keys.getSecond()[i];
final String regionLocation = table.getRegionLocator().getRegionLocation(startKey, false).getHostnamePort();
// Test if the given region is to be included in the InputSplit while splitting the regions of a table
if (!includeRegionInScan(startKey, endKey)) {
continue;
}
// Find the region on which the given row is being served
final String[] hosts = new String[]{regionLocation};

// Determine if regions contains keys used by the scan
boolean isLastRegion = endKey.length == 0;
if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
(scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {

final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
&& !isLastRegion ? endKey : stopRow;
int id = splits.size();
final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
splits.add(split);
}
}
LOG.info("Created " + splits.size() + " splits");
for (TableInputSplit split : splits) {
logSplitInfo("created", split);
}
return splits.toArray(new TableInputSplit[splits.size()]);
}

/**
* Test if the given region is to be included in the scan while splitting the regions of a table.
*
* @param startKey Start key of the region
* @param endKey End key of the region
* @return true, if this region needs to be included as part of the input (default).
*/
protected boolean includeRegionInScan(final byte[] startKey, final byte[] endKey) {
return true;
public TableInputSplit[] createInputSplits(int minNumSplits) throws IOException {
return tableInputSplitStrategy.createInputSplits(table.getConfiguration(), minNumSplits);
}

@Override
Expand Down