From dca62ebc1f37f55001b95e231ca5e57f0cb25899 Mon Sep 17 00:00:00 2001 From: fpompermaier Date: Thu, 20 Nov 2014 00:08:02 +0100 Subject: [PATCH 1/7] Upgraded HBase addon to HBase 0.98.x and new Tuple APIs + fix of ExecutionEnvironment --- flink-addons/flink-hbase/pom.xml | 143 +++++-- .../hbase/GenericTableOutputFormat.java | 116 ------ .../flink/addons/hbase/HBaseDataSink.java | 47 --- .../flink/addons/hbase/TableInputFormat.java | 371 +++++------------- .../flink/addons/hbase/common/HBaseKey.java | 87 ---- .../addons/hbase/common/HBaseResult.java | 69 ---- .../flink/addons/hbase/common/HBaseUtil.java | 68 ---- .../hbase/example/HBaseReadExample.java | 129 ------ .../hbase/example/HBaseReadExample.java | 93 +++++ .../src/test/resources/hbase-site.xml | 43 ++ .../src/test/resources/log4j.properties | 6 + flink-addons/pom.xml | 3 +- .../runtime/execution/RuntimeEnvironment.java | 12 +- 13 files changed, 357 insertions(+), 830 deletions(-) delete mode 100644 flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java delete mode 100644 flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java mode change 100644 => 100755 flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java delete mode 100644 flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java delete mode 100644 flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java delete mode 100644 flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java delete mode 100644 flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java create mode 100755 flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java create mode 100644 flink-addons/flink-hbase/src/test/resources/hbase-site.xml create mode 100755 flink-addons/flink-hbase/src/test/resources/log4j.properties diff --git a/flink-addons/flink-hbase/pom.xml b/flink-addons/flink-hbase/pom.xml index 9a400a7342f33..662b505eeb362 100644 --- a/flink-addons/flink-hbase/pom.xml +++ b/flink-addons/flink-hbase/pom.xml @@ -28,59 +28,60 @@ under the License. 0.8-incubating-SNAPSHOT .. - - - - cloudera-releases - https://repository.cloudera.com/artifactory/cloudera-repos - - true - - - false - - - - - - 0.96.0-hadoop2 - flink-hbase flink-hbase jar + + 0.98.6.1-hadoop1 + 0.98.6.1-hadoop2 + + org.apache.flink flink-core ${project.version} - org.apache.flink flink-java ${project.version} - - org.apache.hbase - hbase - 0.94.2-cdh4.2.1 + org.apache.flink + flink-clients + ${project.version} - - org.jruby - jruby-complete + org.apache.hadoop + hadoop-core + test + + + org.apache.flink + flink-hadoop-compatibility + ${project.version} + test + + + org.apache.hbase + hbase-client + ${hbase.version} - - org.apache.hadoop - hadoop-client - ${hadoop.version} + org.apache.hbase + hbase-server + ${hbase.version} + + + org.jruby + jruby-complete + asm asm @@ -116,20 +117,76 @@ under the License. - - + + + hadoop-1 + + + + + !hadoop.profile + + + + ${hbase.hadoop1.version} + + + + hadoop-2 + + + + + hadoop.profile + 2 + + + + ${hbase.hadoop2.version} + + + + + org.apache.hadoop + hadoop-common + + + + + cdh5.1.3 + + 2 + 0.98.1-cdh5.1.3 + 2.3.0-cdh5.1.3 + + + 2.3.0-mr1-cdh5.1.3 + + + + + org.apache.hadoop + hadoop-core + ${hadoop.core.version} + + + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java deleted file mode 100644 index 49fcae390b723..0000000000000 --- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.addons.hbase; - -import java.io.IOException; - -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.Record; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskType; - -public abstract class GenericTableOutputFormat implements OutputFormat { - - private static final long serialVersionUID = 1L; - - public static final String JT_ID_KEY = "pact.hbase.jtkey"; - - public static final String JOB_ID_KEY = "pact.job.id"; - - private RecordWriter writer; - - private Configuration config; - - private org.apache.hadoop.conf.Configuration hadoopConfig; - - private TaskAttemptContext context; - - private String jtID; - - private int jobId; - - - @Override - public void configure(Configuration parameters) { - this.config = parameters; - - // get the ID parameters - this.jtID = parameters.getString(JT_ID_KEY, null); - if (this.jtID == null) { - throw new RuntimeException("Missing JT_ID entry in hbase config."); - } - this.jobId = parameters.getInteger(JOB_ID_KEY, -1); - if (this.jobId < 0) { - throw new RuntimeException("Missing or invalid job id in input config."); - } - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException { - this.hadoopConfig = getHadoopConfig(this.config); - - /** - * PLASE NOTE: - * If you are a Eclipse+Maven Integration user and you have two (or more) warnings here, please - * close the pact-hbase project OR set the maven profile to hadoop_yarn - * - * pact-hbase requires hadoop_yarn, but Eclipse is not able to parse maven profiles properly. Therefore, - * it imports the pact-hbase project even if it is not included in the standard profile (hadoop_v1) - */ - final TaskAttemptID attemptId = new TaskAttemptID(this.jtID, this.jobId, TaskType.MAP, taskNumber - 1, 0); - - this.context = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(this.hadoopConfig, attemptId); - final HFileOutputFormat outFormat = new HFileOutputFormat(); - try { - this.writer = outFormat.getRecordWriter(this.context); - } catch (InterruptedException iex) { - throw new IOException("Opening the writer was interrupted.", iex); - } - } - - @Override - public void close() throws IOException { - final RecordWriter writer = this.writer; - this.writer = null; - if (writer != null) { - try { - writer.close(this.context); - } catch (InterruptedException iex) { - throw new IOException("Closing was interrupted.", iex); - } - } - } - - public void collectKeyValue(KeyValue kv) throws IOException { - try { - this.writer.write(null, kv); - } catch (InterruptedException iex) { - throw new IOException("Write request was interrupted.", iex); - } - } - - public abstract org.apache.hadoop.conf.Configuration getHadoopConfig(Configuration config); -} diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java deleted file mode 100644 index fc0f22613f36d..0000000000000 --- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.addons.hbase; - -import java.util.Random; - -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.java.record.operators.GenericDataSink; - -/** - * A sink for writing to HBase - */ -public class HBaseDataSink extends GenericDataSink { - - private static final int IDENTIFYIER_LEN = 16; - - public HBaseDataSink(GenericTableOutputFormat f, Operator input, String name) { - super(f, input, name); - - // generate a random unique identifier string - final Random rnd = new Random(); - final StringBuilder bld = new StringBuilder(); - for (int i = 0; i < IDENTIFYIER_LEN; i++) { - bld.append((char) (rnd.nextInt(26) + 'a')); - } - - setParameter(GenericTableOutputFormat.JT_ID_KEY, bld.toString()); - setParameter(GenericTableOutputFormat.JOB_ID_KEY, rnd.nextInt()); - } -} diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java old mode 100644 new mode 100755 index d2b5d049a78d6..924c8e327ebc4 --- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java +++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java @@ -23,344 +23,186 @@ import java.util.ArrayList; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.addons.hbase.common.HBaseKey; -import org.apache.flink.addons.hbase.common.HBaseResult; -import org.apache.flink.addons.hbase.common.HBaseUtil; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.LocatableInputSplitAssigner; import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.types.Record; -import org.apache.flink.util.OperatingSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableRecordReader; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link InputFormat} subclass that wraps the access for HTables. + * + * @author Flavio Pompermaier */ -public class TableInputFormat implements InputFormat { +public abstract class TableInputFormat implements InputFormat{ private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class); - /** A handle on an HBase table */ - private HTable table; - - /** The scanner that performs the actual access on the table. HBase object */ - private Scan scan; - - /** Hbase' iterator wrapper */ - private TableRecordReader tableRecordReader; - /** helper variable to decide whether the input is exhausted or not */ private boolean endReached = false; + + // TODO table and scan could be serialized when kryo serializer will be the default + private transient HTable table; + private transient Scan scan; + + /** HBase iterator wrapper */ + private TableRecordReader tableRecordReader; - /** Job parameter that specifies the input table. */ - public static final String INPUT_TABLE = "hbase.inputtable"; - - /** Location of the hbase-site.xml. If set, the HBaseAdmin will build inside */ - public static final String CONFIG_LOCATION = "hbase.config.location"; - - /** - * Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified. - * See TableMapReduceUtil.convertScanToString(Scan) for more details. - */ - public static final String SCAN = "hbase.scan"; - - /** Column Family to Scan */ - public static final String SCAN_COLUMN_FAMILY = "hbase.scan.column.family"; - - /** Space delimited list of columns to scan. */ - public static final String SCAN_COLUMNS = "hbase.scan.columns"; - - /** The timestamp used to filter columns with a specific timestamp. */ - public static final String SCAN_TIMESTAMP = "hbase.scan.timestamp"; - - /** The starting timestamp used to filter columns with a specific range of versions. */ - public static final String SCAN_TIMERANGE_START = "hbase.scan.timerange.start"; - - /** The ending timestamp used to filter columns with a specific range of versions. */ - public static final String SCAN_TIMERANGE_END = "hbase.scan.timerange.end"; - - /** The maximum number of version to return. */ - public static final String SCAN_MAXVERSIONS = "hbase.scan.maxversions"; - - /** Set to false to disable server-side caching of blocks for this scan. */ - public static final String SCAN_CACHEBLOCKS = "hbase.scan.cacheblocks"; - - /** The number of rows for caching that will be passed to scanners. */ - public static final String SCAN_CACHEDROWS = "hbase.scan.cachedrows"; - - /** mutable objects that are used to avoid recreation of wrapper objects */ - protected HBaseKey hbaseKey; - - protected HBaseResult hbaseResult; - - private org.apache.hadoop.conf.Configuration hConf; - - @Override - public void configure(Configuration parameters) { - HTable table = createTable(parameters); - setTable(table); - Scan scan = createScanner(parameters); - setScan(scan); - } - + // abstract methods allow for multiple table and scanners in the same job + protected abstract Scan getScanner(); + protected abstract String getTableName(); + protected abstract T mapResultToTuple(Result r); + /** - * Read the configuration and creates a {@link Scan} object. + * creates a {@link Scan} object and a {@link HTable} connection * * @param parameters - * @return The scanner + * @see {@link Configuration} */ - protected Scan createScanner(Configuration parameters) { - Scan scan = null; - if (parameters.getString(SCAN, null) != null) { - try { - scan = HBaseUtil.convertStringToScan(parameters.getString(SCAN, null)); - } catch (IOException e) { - LOG.error("An error occurred.", e); - } - } else { - try { - scan = new Scan(); - - // if (parameters.getString(SCAN_COLUMNS, null) != null) { - // scan.addColumns(parameters.getString(SCAN_COLUMNS, null)); - // } - - if (parameters.getString(SCAN_COLUMN_FAMILY, null) != null) { - scan.addFamily(Bytes.toBytes(parameters.getString(SCAN_COLUMN_FAMILY, null))); - } - - if (parameters.getString(SCAN_TIMESTAMP, null) != null) { - scan.setTimeStamp(Long.parseLong(parameters.getString(SCAN_TIMESTAMP, null))); - } - - if (parameters.getString(SCAN_TIMERANGE_START, null) != null - && parameters.getString(SCAN_TIMERANGE_END, null) != null) { - scan.setTimeRange( - Long.parseLong(parameters.getString(SCAN_TIMERANGE_START, null)), - Long.parseLong(parameters.getString(SCAN_TIMERANGE_END, null))); - } - - if (parameters.getString(SCAN_MAXVERSIONS, null) != null) { - scan.setMaxVersions(Integer.parseInt(parameters.getString(SCAN_MAXVERSIONS, null))); - } - - if (parameters.getString(SCAN_CACHEDROWS, null) != null) { - scan.setCaching(Integer.parseInt(parameters.getString(SCAN_CACHEDROWS, null))); - } - - // false by default, full table scans generate too much BC churn - scan.setCacheBlocks((parameters.getBoolean(SCAN_CACHEBLOCKS, false))); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } - } - return scan; + @Override + public void configure(Configuration parameters) { + this.table = createTable(); + this.scan = getScanner(); } - - /** - * Create an {@link HTable} instance and set it into this format. - * - * @param parameters - * a {@link Configuration} that holds at least the table name. - */ - protected HTable createTable(Configuration parameters) { - String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null); - LOG.info("Got config location: " + configLocation); - if (configLocation != null) - { - org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration(); - if(OperatingSystem.isWindows()) { - dummyConf.addResource(new Path("file:/" + configLocation)); - } else { - dummyConf.addResource(new Path("file://" + configLocation)); - } - hConf = HBaseConfiguration.create(dummyConf); - ; - // hConf.set("hbase.master", "im1a5.internetmemory.org"); - LOG.info("hbase master: " + hConf.get("hbase.master")); - LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum")); - - } - String tableName = parameters.getString(INPUT_TABLE, ""); + + /** Create an {@link HTable} instance and set it into this format */ + private HTable createTable() { + LOG.info("Initializing HBaseConfiguration"); + //use files found in the classpath + org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create(); + try { - return new HTable(this.hConf, tableName); + return new HTable(hConf, getTableName()); } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); } return null; } - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { - // TODO Auto-generated method stub - return null; - } - @Override public boolean reachedEnd() throws IOException { return this.endReached; } - protected boolean nextResult() throws IOException { - if (this.tableRecordReader == null) - { + @Override + public T nextRecord( + T reuse) throws IOException { + if (this.tableRecordReader == null){ throw new IOException("No table record reader provided!"); } try { - if (this.tableRecordReader.nextKeyValue()) - { - ImmutableBytesWritable currentKey = this.tableRecordReader.getCurrentKey(); - Result currentValue = this.tableRecordReader.getCurrentValue(); - - hbaseKey.setWritable(currentKey); - hbaseResult.setResult(currentValue); - } else - { - this.endReached = true; - return false; - } + if (this.tableRecordReader.nextKeyValue()){ + Result res = tableRecordReader.getCurrentValue(); + return mapResultToTuple(res); + } + this.endReached = true; } catch (InterruptedException e) { LOG.error("Table reader has been interrupted", e); throw new IOException(e); } - - return true; - } - - @Override - public Record nextRecord(Record record) throws IOException { - if (nextResult()) { - mapResultToRecord(record, hbaseKey, hbaseResult); - return record; - } else { - return null; - } - } - - /** - * Maps the current HBase Result into a Record. - * This implementation simply stores the HBaseKey at position 0, and the HBase Result object at position 1. - * - * @param record - * @param key - * @param result - */ - public void mapResultToRecord(Record record, HBaseKey key, HBaseResult result) { - record.setField(0, key); - record.setField(1, result); - } - - @Override - public void close() throws IOException { - this.tableRecordReader.close(); + return null; } @Override public void open(TableInputSplit split) throws IOException { - if (split == null) - { + if (split == null){ throw new IOException("Input split is null!"); } - - if (this.table == null) - { + if (table == null){ throw new IOException("No HTable provided!"); } - - if (this.scan == null) - { + if (scan == null){ throw new IOException("No Scan instance provided"); } + logSplitInfo("opening", split); + scan.setStartRow(split.getStartRow()); + scan.setStopRow(split.getEndRow()); + this.tableRecordReader = new TableRecordReader(); - - this.tableRecordReader.setHTable(this.table); - - Scan sc = new Scan(this.scan); - sc.setStartRow(split.getStartRow()); - LOG.info("split start row: " + new String(split.getStartRow())); - sc.setStopRow(split.getEndRow()); - LOG.info("split end row: " + new String(split.getEndRow())); - - this.tableRecordReader.setScan(sc); + this.tableRecordReader.setHTable(table); + this.tableRecordReader.setScan(scan); this.tableRecordReader.restart(split.getStartRow()); - this.hbaseKey = new HBaseKey(); - this.hbaseResult = new HBaseResult(); - endReached = false; } - + + @Override + public void close() throws IOException { + this.tableRecordReader.close(); + this.table.close(); + } @Override public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException { - - if (this.table == null) { - throw new IOException("No table was provided."); - } - - final Pair keys = this.table.getStartEndKeys(); - + //Gets the starting and ending row keys for every region in the currently open table + final Pair keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - throw new IOException("Expecting at least one region."); } - int count = 0; - final List splits = new ArrayList(keys.getFirst().length); + final byte[] startRow = scan.getStartRow(); + final byte[] stopRow = scan.getStopRow(); + final boolean scanWithNoLowerBound = startRow.length == 0; + final boolean scanWithNoUpperBound = stopRow.length == 0; + + final List splits = new ArrayList(minNumSplits); for (int i = 0; i < keys.getFirst().length; i++) { - - if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + final byte[] startKey = keys.getFirst()[i]; + final byte[] endKey = keys.getSecond()[i]; + final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort(); + //Test if the given region is to be included in the InputSplit while splitting the regions of a table + if (!includeRegionInSplit(startKey, endKey)) { continue; } - - final String regionLocation = this.table.getRegionLocation(keys.getFirst()[i], false).getHostnamePort(); - final byte[] startRow = this.scan.getStartRow(); - final byte[] stopRow = this.scan.getStopRow(); - - // determine if the given start an stop key fall into the region - if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || - Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && - (stopRow.length == 0 || - Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { - - final byte[] splitStart = startRow.length == 0 || - Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? - keys.getFirst()[i] : startRow; - final byte[] splitStop = (stopRow.length == 0 || - Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && - keys.getSecond()[i].length > 0 ? - keys.getSecond()[i] : stopRow; - final TableInputSplit split = new TableInputSplit(splits.size(), new String[] { regionLocation }, - this.table.getTableName(), splitStart, splitStop); + //Finds the region on which the given row is being served + final String[] hosts = new String[] { regionLocation }; + + // determine if regions contains keys used by the scan + boolean isLastRegion = endKey.length == 0; + if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) && + (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) { + + final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow; + final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) + && !isLastRegion ? endKey : stopRow; + int id = splits.size(); + final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop); splits.add(split); - if (LOG.isDebugEnabled()) { - LOG.debug("getSplits: split -> " + (count++) + " -> " + split); - } } } - + LOG.info("Created " + splits.size() + " splits"); + for (TableInputSplit split : splits) { + logSplitInfo("created", split); + } return splits.toArray(new TableInputSplit[0]); } + + private void logSplitInfo(String action, TableInputSplit split) { + int splitId = split.getSplitNumber(); + String splitStart = Bytes.toString(split.getStartRow()); + String splitEnd = Bytes.toString(split.getEndRow()); + String splitStartKey = splitStart.isEmpty() ? "-" : splitStart; + String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd; + String[] hostnames = split.getHostnames(); + LOG.info("{} split [{}|{}|{}|{}]",action, splitId, hostnames, splitStartKey, splitStopKey); + } /** - * Test if the given region is to be included in the InputSplit while splitting - * the regions of a table. + * Test if the given region is to be included in the InputSplit while splitting the regions of a table. *

* This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, * (and hence, not contributing to the InputSplit), given the start and end keys of the same.
@@ -386,20 +228,11 @@ private static boolean includeRegionInSplit(final byte[] startKey, final byte[] public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) { return new LocatableInputSplitAssigner(inputSplits); } - - public void setTable(HTable table) { - this.table = table; - } - public HTable getTable() { - return table; - } - - public void setScan(Scan scan) { - this.scan = scan; + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + // TODO Auto-generated method stub + return null; } - public Scan getScan() { - return scan; - } } diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java deleted file mode 100644 index 4c084939f3d61..0000000000000 --- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.addons.hbase.common; - -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.Key; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - -/** - * Simple wrapper to encapsulate an HBase h{@link ImmutableBytesWritable} as a Key - */ -public class HBaseKey implements Key { - - private static final long serialVersionUID = 1L; - - private ImmutableBytesWritable writable; - - - public HBaseKey() { - this.writable = new ImmutableBytesWritable(); - } - - - public HBaseKey(ImmutableBytesWritable writable) { - this.writable = writable; - } - - - public ImmutableBytesWritable getWritable() { - return writable; - } - - public void setWritable(ImmutableBytesWritable writable) { - this.writable = writable; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public void write(DataOutputView out) throws IOException { - this.writable.write(out); - } - - @Override - public void read(DataInputView in) throws IOException { - this.writable.readFields(in); - } - - @Override - public int hashCode() { - return this.writable.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == HBaseKey.class) { - return this.writable.equals(((HBaseKey) obj).writable); - } else { - return false; - } - } - - @Override - public int compareTo(HBaseKey other) { - return this.writable.compareTo(other.writable); - } -} diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java deleted file mode 100644 index dfae1041e51ed..0000000000000 --- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.addons.hbase.common; - -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.Value; -import org.apache.hadoop.hbase.client.Result; - -public class HBaseResult implements Value { - - private static final long serialVersionUID = 1L; - - private Result result; - - - public HBaseResult() { - this.result = new Result(); - } - - public HBaseResult(Result result) { - this.result = result; - } - - - public Result getResult() { - return this.result; - } - - public void setResult(Result result) { - this.result = result; - } - - public String getStringData() { - if(this.result != null) { - return this.result.toString(); - } - return null; - } - - @Override - public void read(DataInputView in) throws IOException { - this.result.readFields(in); - } - - @Override - public void write(DataOutputView out) throws IOException { - this.result.write(out); - } -} diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java deleted file mode 100644 index 607dd7864a552..0000000000000 --- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.addons.hbase.common; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Base64; - -/** - * Utility for {@link TableInputFormat} - */ -public class HBaseUtil { - - /** - * Writes the given scan into a Base64 encoded string. - * - * @param scan - * The scan to write out. - * @return The scan saved in a Base64 encoded string. - * @throws IOException - * When writing the scan fails. - */ - static String convertScanToString(Scan scan) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(out); - scan.write(dos); - return Base64.encodeBytes(out.toByteArray()); - } - - /** - * Converts the given Base64 string back into a Scan instance. - * - * @param base64 - * The scan details. - * @return The newly created Scan instance. - * @throws IOException - * When reading the scan instance fails. - */ - public static Scan convertStringToScan(String base64) throws IOException { - ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64)); - DataInputStream dis = new DataInputStream(bis); - Scan scan = new Scan(); - scan.readFields(dis); - return scan; - } -} diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java deleted file mode 100644 index 881d06a7ded75..0000000000000 --- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.addons.hbase.example; - -import org.apache.flink.addons.hbase.TableInputFormat; -import org.apache.flink.addons.hbase.common.HBaseKey; -import org.apache.flink.addons.hbase.common.HBaseResult; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.GenericDataSource; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; - -/** - * Implements a word count which takes the input file and counts the number of - * the occurrences of each word in the file. - */ -public class HBaseReadExample implements Program, ProgramDescription { - - public static class MyTableInputFormat extends TableInputFormat { - - private static final long serialVersionUID = 1L; - - private final byte[] META_FAMILY = "meta".getBytes(); - - private final byte[] USER_COLUMN = "user".getBytes(); - - private final byte[] TIMESTAMP_COLUMN = "timestamp".getBytes(); - - private final byte[] TEXT_FAMILY = "text".getBytes(); - - private final byte[] TWEET_COLUMN = "tweet".getBytes(); - - public MyTableInputFormat() { - super(); - - } - - @Override - protected HTable createTable(Configuration parameters) { - return super.createTable(parameters); - } - - @Override - protected Scan createScanner(Configuration parameters) { - Scan scan = new Scan (); - scan.addColumn (META_FAMILY, USER_COLUMN); - scan.addColumn (META_FAMILY, TIMESTAMP_COLUMN); - scan.addColumn (TEXT_FAMILY, TWEET_COLUMN); - return scan; - } - - StringValue row_string = new StringValue(); - StringValue user_string = new StringValue(); - StringValue timestamp_string = new StringValue(); - StringValue tweet_string = new StringValue(); - - @Override - public void mapResultToRecord(Record record, HBaseKey key, - HBaseResult result) { - Result res = result.getResult(); - res.getRow(); - record.setField(0, toString(row_string, res.getRow())); - record.setField(1, toString (user_string, res.getValue(META_FAMILY, USER_COLUMN))); - record.setField(2, toString (timestamp_string, res.getValue(META_FAMILY, TIMESTAMP_COLUMN))); - record.setField(3, toString (tweet_string, res.getValue(TEXT_FAMILY, TWEET_COLUMN))); - } - - private final StringValue toString (StringValue string, byte[] bytes) { - string.setValueAscii(bytes, 0, bytes.length); - return string; - } - - } - - - @Override - public Plan getPlan(String... args) { - // parse job parameters - int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1); - String output = (args.length > 1 ? args[1] : ""); - - GenericDataSource source = new GenericDataSource(new MyTableInputFormat(), "HBase Input"); - source.setParameter(TableInputFormat.INPUT_TABLE, "twitter"); - source.setParameter(TableInputFormat.CONFIG_LOCATION, "/etc/hbase/conf/hbase-site.xml"); - FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, source, "HBase String dump"); - CsvOutputFormat.configureRecordFormat(out) - .recordDelimiter('\n') - .fieldDelimiter(' ') - .field(StringValue.class, 0) - .field(StringValue.class, 1) - .field(StringValue.class, 2) - .field(StringValue.class, 3); - - Plan plan = new Plan(out, "HBase access Example"); - plan.setDefaultParallelism(numSubTasks); - return plan; - } - - - @Override - public String getDescription() { - return "Parameters: [numSubStasks] [input] [output]"; - } -} diff --git a/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java new file mode 100755 index 0000000000000..b6f345a42e5f0 --- /dev/null +++ b/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.TableInputFormat; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Simple stub for HBase DataSet + * + * To run the test first create the test table with hbase shell. + * + * Use the following commands: + *

    + *
  • create 'test-table', 'someCf'
  • + *
  • put 'test-table', '1', 'someCf:someQual', 'someString'
  • + *
  • put 'test-table', '2', 'someCf:someQual', 'anotherString'
  • + *
+ * + * The test should return just the first entry. + * + */ +public class HBaseReadExample { + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + @SuppressWarnings("serial") + DataSet> hbaseDs = env.createInput(new TableInputFormat>() { + private final byte[] CF_SOME = "someCf".getBytes(); + private final byte[] Q_SOME = "someQual".getBytes(); + @Override + public String getTableName() { + return "test-table"; + } + + @Override + protected Scan getScanner() { + Scan scan = new Scan(); + scan.addColumn(CF_SOME, Q_SOME); + return scan; + } + + private Tuple2 reuse = new Tuple2(); + + @Override + protected Tuple2 mapResultToTuple(Result r) { + String key = Bytes.toString(r.getRow()); + String val = Bytes.toString(r.getValue(CF_SOME, Q_SOME)); + reuse.setField(key, 0); + reuse.setField(val, 1); + return reuse; + } + }) + .filter(new FilterFunction>() { + + @Override + public boolean filter(Tuple2 t) throws Exception { + String val = t.getField(1); + if(val.startsWith("someStr")) + return true; + return false; + } + }); + + hbaseDs.print(); + + // kick off execution. + env.execute(); + + } + +} diff --git a/flink-addons/flink-hbase/src/test/resources/hbase-site.xml b/flink-addons/flink-hbase/src/test/resources/hbase-site.xml new file mode 100644 index 0000000000000..2984063ada87b --- /dev/null +++ b/flink-addons/flink-hbase/src/test/resources/hbase-site.xml @@ -0,0 +1,43 @@ + + + + + + + hbase.tmp.dir + + /opt/hbase-0.98.6.1-hadoop2/data + + + + hbase.zookeeper.quorum + localhost + + + diff --git a/flink-addons/flink-hbase/src/test/resources/log4j.properties b/flink-addons/flink-hbase/src/test/resources/log4j.properties new file mode 100755 index 0000000000000..c83ec70665ef1 --- /dev/null +++ b/flink-addons/flink-hbase/src/test/resources/log4j.properties @@ -0,0 +1,6 @@ +log4j.rootLogger=${hadoop.root.logger} +hadoop.root.logger=INFO,console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n diff --git a/flink-addons/pom.xml b/flink-addons/pom.xml index c76247d6aab7c..1100cc8a8b061 100644 --- a/flink-addons/pom.xml +++ b/flink-addons/pom.xml @@ -34,6 +34,7 @@ under the License. pom + flink-hbase flink-avro flink-jdbc flink-spargel @@ -64,7 +65,7 @@ under the License. - flink-hbase + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java index 715bbd7b07863..f5531fe100f4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java @@ -177,8 +177,18 @@ public RuntimeEnvironment(Task owner, TaskDeploymentDescriptor tdd, this.taskConfiguration = tdd.getTaskConfiguration(); this.invokable.setEnvironment(this); - this.invokable.registerInputOutput(); + + { + //TODO Check if this fix necessary elsewhere.. + Thread currentThread = Thread.currentThread(); + ClassLoader context = currentThread.getContextClassLoader(); + currentThread.setContextClassLoader(userCodeClassLoader); + + this.invokable.registerInputOutput(); + currentThread.setContextClassLoader(context); + } + List inGates = tdd.getInputGates(); List outGates = tdd.getOutputGates(); From da04ae1e28b4f5f8061ddc85a99ce1ee2bdfa409 Mon Sep 17 00:00:00 2001 From: fpompermaier Date: Mon, 10 Nov 2014 13:08:34 +0100 Subject: [PATCH 2/7] Fixed autoformatting of comments --- flink-addons/flink-hbase/pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-addons/flink-hbase/pom.xml b/flink-addons/flink-hbase/pom.xml index 662b505eeb362..0d9ae6ec5a7f6 100644 --- a/flink-addons/flink-hbase/pom.xml +++ b/flink-addons/flink-hbase/pom.xml @@ -34,7 +34,8 @@ under the License. jar - 0.98.6.1-hadoop1 + + 0.98.6.1-hadoop2 0.98.6.1-hadoop2 From bbfa366d9ea2907d1c0b5d8d7e4c9ab316d6b9b8 Mon Sep 17 00:00:00 2001 From: fpompermaier Date: Thu, 20 Nov 2014 15:37:26 +0100 Subject: [PATCH 3/7] Removed HBase server dependency --- flink-addons/flink-hbase/pom.xml | 47 +------------------ .../flink/addons/hbase/TableInputFormat.java | 34 +++++--------- 2 files changed, 13 insertions(+), 68 deletions(-) diff --git a/flink-addons/flink-hbase/pom.xml b/flink-addons/flink-hbase/pom.xml index 0d9ae6ec5a7f6..88378c4fdeb4e 100644 --- a/flink-addons/flink-hbase/pom.xml +++ b/flink-addons/flink-hbase/pom.xml @@ -34,8 +34,7 @@ under the License. jar - - 0.98.6.1-hadoop2 + 0.98.6.1-hadoop1 0.98.6.1-hadoop2 @@ -73,50 +72,6 @@ under the License. hbase-client ${hbase.version} - - org.apache.hbase - hbase-server - ${hbase.version} - - - - org.jruby - jruby-complete - - - asm - asm - - - tomcat - jasper-compiler - - - tomcat - jasper-runtime - - - org.mortbay.jetty - jetty - - - org.mortbay.jetty - jsp-api-2.1 - - - org.mortbay.jetty - jsp-2.1 - - - org.mortbay.jetty - jetty-util - - - org.eclipse.jdt - core - - - diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java index 924c8e327ebc4..912fc7682afc5 100755 --- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java +++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java @@ -32,8 +32,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.mapreduce.TableRecordReader; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; @@ -59,7 +59,7 @@ public abstract class TableInputFormat implements InputFormat Date: Tue, 25 Nov 2014 12:24:18 +0100 Subject: [PATCH 4/7] Changed visibility of table and scan to protected --- .../java/org/apache/flink/addons/hbase/TableInputFormat.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java index 912fc7682afc5..3fb23c2b87d6c 100755 --- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java +++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java @@ -55,8 +55,8 @@ public abstract class TableInputFormat implements InputFormat Date: Fri, 28 Nov 2014 10:06:15 +0100 Subject: [PATCH 5/7] Added retry on scan fail (due to long pauses between two consecutive nextRecord()) --- .../flink/addons/hbase/TableInputFormat.java | 46 +++++++++++++++---- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java index 3fb23c2b87d6c..a718de5058bc5 100755 --- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java +++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java @@ -43,7 +43,6 @@ /** * {@link InputFormat} subclass that wraps the access for HTables. * - * @author Flavio Pompermaier */ public abstract class TableInputFormat implements InputFormat{ @@ -61,6 +60,9 @@ public abstract class TableInputFormat implements InputFormat Date: Fri, 28 Nov 2014 10:06:15 +0100 Subject: [PATCH 6/7] Added retry on scan fail (due to long pauses between two consecutive nextRecord()) --- .../flink/addons/hbase/TableInputFormat.java | 47 +++++++++++++++---- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java index 3fb23c2b87d6c..0d0390f746de7 100755 --- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java +++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java @@ -43,7 +43,6 @@ /** * {@link InputFormat} subclass that wraps the access for HTables. * - * @author Flavio Pompermaier */ public abstract class TableInputFormat implements InputFormat{ @@ -61,6 +60,9 @@ public abstract class TableInputFormat implements InputFormat Date: Thu, 11 Dec 2014 13:43:47 +0100 Subject: [PATCH 7/7] Aligned default hadoop profile to 2 --- flink-addons/flink-hbase/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-addons/flink-hbase/pom.xml b/flink-addons/flink-hbase/pom.xml index 5ad1c94c70d86..64fdd300a507d 100644 --- a/flink-addons/flink-hbase/pom.xml +++ b/flink-addons/flink-hbase/pom.xml @@ -82,6 +82,7 @@ under the License. !hadoop.profile + 1 @@ -94,8 +95,7 @@ under the License. - hadoop.profile - 2 + !hadoop.profile