From 1230a6c7c655bd46c891dd92a030c45f9a82fa1b Mon Sep 17 00:00:00 2001 From: zhangminglei Date: Wed, 18 Jul 2018 21:49:12 +0800 Subject: [PATCH] [FLINK-9849] [hbase] Hbase upgrade to 2.0.1 --- flink-connectors/flink-hbase/pom.xml | 8 +++++++- .../flink/addons/hbase/AbstractTableInputFormat.java | 2 +- .../org/apache/flink/addons/hbase/TableInputFormat.java | 8 +++++++- .../addons/hbase/HBaseTestingClusterAutostarter.java | 5 +++-- .../flink/addons/hbase/example/HBaseWriteExample.java | 2 +- .../addons/hbase/example/HBaseWriteStreamExample.java | 9 ++++++--- 6 files changed, 25 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-hbase/pom.xml b/flink-connectors/flink-hbase/pom.xml index 15774c86a7be7..340098e702a8f 100644 --- a/flink-connectors/flink-hbase/pom.xml +++ b/flink-connectors/flink-hbase/pom.xml @@ -34,7 +34,7 @@ under the License. jar - 1.4.3 + 2.0.1 @@ -222,6 +222,12 @@ under the License. + + org.apache.hbase + hbase-mapreduce + ${hbase.version} + + diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java index 73a21b38c3ca1..084c5b72055f7 100644 --- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java @@ -232,7 +232,7 @@ public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOExce final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) && !isLastRegion ? endKey : stopRow; int id = splits.size(); - final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop); + final TableInputSplit split = new TableInputSplit(id, hosts, table.getName().getName(), splitStart, splitStop); splits.add(split); } } diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java index ebe25c819aece..5a3ffe9601a33 100644 --- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java @@ -23,7 +23,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -81,7 +85,9 @@ private HTable createTable() { org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create(); try { - return new HTable(hConf, getTableName()); + Connection connection = ConnectionFactory.createConnection(hConf); + Table table = connection.getTable(TableName.valueOf(getTableName())); + return (HTable) table; } catch (Exception e) { LOG.error("Error instantiating a new HTable instance", e); } diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java index e4b2bd2fa3a77..45a4af4f672e1 100644 --- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.ScannerCallable; @@ -159,7 +160,7 @@ public static void setUp() throws Exception { TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", -1); // Make sure the zookeeper quorum value contains the right port number (varies per run). - TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", "localhost:" + TEST_UTIL.getZkCluster().getClientPort()); + TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", "localhost:" + new MiniZooKeeperCluster().getClientPort()); conf = initialize(TEST_UTIL.getConfiguration()); LOG.info("HBase minicluster: Running"); @@ -184,7 +185,7 @@ public static void registerHBaseMiniClusterInClasspath() { fail("Unable to create output directory " + hbaseSiteXmlDirectory + " for the HBase minicluster"); } - assertNotNull("The ZooKeeper for the HBase minicluster is missing", TEST_UTIL.getZkCluster()); + assertNotNull("The ZooKeeper for the HBase minicluster is missing", new MiniZooKeeperCluster()); createHBaseSiteXml(hbaseSiteXmlDirectory, TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum")); addDirectoryToClassPath(hbaseSiteXmlDirectory); diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java index ca823922f98b6..2adbc585426af 100644 --- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java @@ -90,7 +90,7 @@ public void open(Configuration parameters) throws Exception { public Tuple2 map(Tuple2 t) throws Exception { reuse.f0 = new Text(t.f0); Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET)); - put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1)); + put.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1)); reuse.f1 = put; return reuse; } diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java index 1ed471d5070b4..f6e2a53026a3d 100644 --- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java @@ -24,6 +24,9 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; @@ -87,14 +90,15 @@ public void configure(Configuration parameters) { @Override public void open(int taskNumber, int numTasks) throws IOException { - table = new HTable(conf, "flinkExample"); + Connection connection = ConnectionFactory.createConnection(conf); + table = (HTable) connection.getTable(TableName.valueOf("flinkExample")); this.taskNumber = String.valueOf(taskNumber); } @Override public void writeRecord(String record) throws IOException { Put put = new Put(Bytes.toBytes(taskNumber + rowNumber)); - put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"), + put.addColumn(Bytes.toBytes("entry"), Bytes.toBytes("entry"), Bytes.toBytes(rowNumber)); rowNumber++; table.put(put); @@ -102,7 +106,6 @@ public void writeRecord(String record) throws IOException { @Override public void close() throws IOException { - table.flushCommits(); table.close(); }