From bc5c21e52fcdc46f1f4b6b350d4351bc2e1134f7 Mon Sep 17 00:00:00 2001 From: Roshanson <736781877@qq.com> Date: Fri, 2 Sep 2016 19:17:33 +0800 Subject: [PATCH 01/19] refactor example sources task to use DataSourceAPI --- .../gearpump/streaming/examples/wordcount/WordCountSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala index f7035526f..5121815cd 100644 --- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala @@ -44,7 +44,7 @@ class WordCountSpec property("WordCount should succeed to submit application with required arguments") { val requiredArgs = Array.empty[String] val optionalArgs = Array( - "-split", "1", + "-source", "1", "-sum", "1") val args = { From 5db634a5f6cfa2ff198a46019bb8a4fa6b63cfeb Mon Sep 17 00:00:00 2001 From: "736781877@qq.com" Date: Fri, 2 Sep 2016 20:26:48 +0800 Subject: [PATCH 02/19] refactor example sources task to use DataSourceAPI --- .../apache/gearpump/streaming/examples/wordcount/SplitSpec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala index 7c9de35aa..bb42b8501 100644 --- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala @@ -40,7 +40,6 @@ class SplitSpec extends WordSpec with Matchers { implicit val system: ActorSystem = ActorSystem("test", TestUtil.DEFAULT_CONFIG) val mockTaskActor = TestProbe() - when(taskContext.self).thenReturn(mockTaskActor.ref) val split = new Split From 57c7e8817ba9c8683039e050a217904d065f7a13 Mon Sep 17 00:00:00 2001 From: "736781877@qq.com" Date: Fri, 2 Sep 2016 20:31:28 +0800 Subject: [PATCH 03/19] refactor example sources task to use DataSourceAPI --- .../apache/gearpump/streaming/examples/wordcount/SplitSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala index bb42b8501..7c9de35aa 100644 --- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala @@ -40,6 +40,7 @@ class SplitSpec extends WordSpec with Matchers { implicit val system: ActorSystem = ActorSystem("test", TestUtil.DEFAULT_CONFIG) val mockTaskActor = TestProbe() + when(taskContext.self).thenReturn(mockTaskActor.ref) val split = new Split From 0097e2527783114a7d8890eca7173e5fb55a6194 Mon Sep 17 00:00:00 2001 From: Roshanson <736781877@qq.com> Date: Tue, 13 Sep 2016 21:30:57 +0800 Subject: [PATCH 04/19] [GEARPUMP-204]Add unit test for external_hbase module --- .../gearpump/external/hbase/HBaseSink.scala | 52 +++++++++++------- .../external/hbase/HBaseSinkSpec.scala | 54 ++++++++++++++----- 2 files changed, 76 insertions(+), 30 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index e4d56335c..d4f381514 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -32,18 +32,32 @@ import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.{Constants, FileUtils} -class HBaseSink( - userconfig: UserConfig, tableName: String, @transient var configuration: Configuration) - extends DataSink{ - lazy val connection = HBaseSink.getConnection(userconfig, configuration) +class HBaseSink(userconfig: UserConfig, tableName: String, @transient var connection: Connection, + @transient var configuration: Configuration, @transient var isTest: Boolean) + extends DataSink { + + connection = + if (isTest) { + connection + } else { + HBaseSink.getConnection(userconfig, configuration) + } + // var connection = HBaseSink.getConnection(userconfig, configuration) lazy val table = connection.getTable(TableName.valueOf(tableName)) override def open(context: TaskContext): Unit = {} def this(userconfig: UserConfig, tableName: String) = { - this(userconfig, tableName, HBaseConfiguration.create()) + this(userconfig, tableName, HBaseSink.getConnection(userconfig, HBaseConfiguration.create()), + HBaseConfiguration.create(), false) + } + def this(userconfig: UserConfig, tableName: String, configuration: Configuration) = { + this(userconfig, tableName, HBaseSink.getConnection(userconfig, HBaseConfiguration.create()), + HBaseConfiguration.create(), false) } + + def insert(rowKey: String, columnGroup: String, columnName: String, value: String): Unit = { insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup), Bytes.toBytes(columnName), Bytes.toBytes(value)) @@ -51,7 +65,7 @@ class HBaseSink( def insert( rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], value: Array[Byte]) - : Unit = { + : Unit = { val put = new Put(rowKey) put.addColumn(columnGroup, columnName, value) table.put(put) @@ -87,19 +101,12 @@ class HBaseSink( connection.close() } - /** - * Overrides Java's default serialization - * Please do not remove this - */ + private def writeObject(out: ObjectOutputStream): Unit = { out.defaultWriteObject() configuration.write(out) } - /** - * Overrides Java's default deserialization - * Please do not remove this - */ private def readObject(in: ObjectInputStream): Unit = { in.defaultReadObject() val clientConf = new Configuration(false) @@ -115,15 +122,24 @@ object HBaseSink { val COLUMN_NAME = "hbase.table.column.name" val HBASE_USER = "hbase.user" - def apply[T](userconfig: UserConfig, tableName: String): HBaseSink = { - new HBaseSink(userconfig, tableName) - } def apply[T](userconfig: UserConfig, tableName: String, configuration: Configuration) - : HBaseSink = { + : HBaseSink = { new HBaseSink(userconfig, tableName, configuration) } + def apply[T]( + userconfig: UserConfig, tableName: String) + : HBaseSink = { + new HBaseSink(userconfig, tableName) + } + def apply[T]( + userconfig: UserConfig, tableName: String, connection: Connection, + configuration: Configuration, isTest: Boolean ) + : HBaseSink = { + new HBaseSink(userconfig, tableName, connection, configuration, isTest) + } + private def getConnection(userConfig: UserConfig, configuration: Configuration): Connection = { if (UserGroupInformation.isSecurityEnabled) { val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL) diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala index 24b964608..4ae5c0ad7 100644 --- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala +++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala @@ -17,24 +17,54 @@ */ package org.apache.gearpump.external.hbase +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.util.Bytes +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers { +class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { property("HBaseSink should insert a row successfully") { - // import Mockito._ - // val htable = Mockito.mock(classOf[HTable]) - // val row = "row" - // val group = "group" - // val name = "name" - // val value = "1.2" - // val put = new Put(Bytes.toBytes(row)) - // put.add(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) - // val hbaseSink = HBaseSink(htable) - // hbaseSink.insert(put) - // verify(htable).put(put) + val table = mock[Table] + val config = mock[Configuration] + val connection = mock[Connection] + val taskContext = mock[TaskContext] + // val connectionFactory = mock[ConnectionFactory] + + val map = Map[String, String]("HBASESINK" -> "hbasesink", "TABLE_NAME" -> "hbase.table.name", + "COLUMN_FAMILY" -> "hbase.table.column.family", "COLUMN_NAME" -> "hbase.table.column.name", + "HBASE_USER" -> "hbase.user", "GEARPUMP_KERBEROS_PRINCIPAL" -> "gearpump.kerberos.principal", + "GEARPUMP_KEYTAB_FILE" -> "gearpump.keytab.file" + ) + val userconfig = new UserConfig(map) + // val tableName = new TableName() + val tablename = "hbase" + val row = "row" + val group = "group" + val name = "name" + val value = "1.2" + + when(connection.getTable(TableName.valueOf(tablename))).thenReturn(table) + + + val put = new Put(Bytes.toBytes(row)) + put.addColumn(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) + + val hbaseSink = HBaseSink(userconfig, tablename, connection, config, true) + hbaseSink.open(taskContext) + hbaseSink.insert(Bytes.toBytes(row), Bytes.toBytes(group), Bytes.toBytes(name), + Bytes.toBytes(value)) + verify(table).put(MockUtil.argMatch[Put](_.getRow sameElements Bytes.toBytes(row))) + + } } From 99257d02e54bd0fa7ad530487aa1b2282964ca85 Mon Sep 17 00:00:00 2001 From: roshanson Date: Mon, 19 Sep 2016 17:00:49 +0800 Subject: [PATCH 05/19] connection to hbase --- conf/gear.conf | 2 +- .../gearpump/streaming/hbase/HbaseConn.scala | 102 ++++++++++++++++++ .../gearpump/streaming/hbase/toHBase.scala | 57 ++++++++++ .../gearpump/external/hbase/HBaseSink.scala | 1 + project/BuildExample.scala | 15 ++- 5 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/HbaseConn.scala create mode 100644 examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala diff --git a/conf/gear.conf b/conf/gear.conf index fa0a3e997..6037e2282 100644 --- a/conf/gear.conf +++ b/conf/gear.conf @@ -243,7 +243,7 @@ gearpump { ################### executor { vmargs = "-server -Xms512M -Xmx1024M -Xss1M -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseParNewGC -XX:NewRatio=3 -Djava.rmi.server.hostname=localhost" - extraClasspath = "" + extraClasspath = "/home/manuzhang/hbase-1.2.2/lib/*:/home/manuzhang/hbase-1.2.2/conf" } ### Streaming related configuration diff --git a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/HbaseConn.scala b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/HbaseConn.scala new file mode 100644 index 000000000..2fdec5f1c --- /dev/null +++ b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/HbaseConn.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.hbase + + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.embedded.EmbeddedCluster +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.external.hbase.HBaseSink +import org.apache.gearpump.streaming.StreamApplication +import org.apache.gearpump.streaming.sink.DataSinkProcessor +import org.apache.gearpump.util.Graph.Node +import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} +import org.slf4j.Logger + +object HbaseConn extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + val RUN_FOR_EVER = -1 + + override val options: Array[(String, CLIOption[Any])] = Array( + "tableName" -> CLIOption[String]("", required = false, defaultValue = Some("sss")), + "sinkNum" -> CLIOption[Int]("", required = false, defaultValue = Some(1)), + "debug" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)), + "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false, + defaultValue = Some(30)) + + ) + + def application(config: ParseResult, system: ActorSystem): StreamApplication = { + implicit val actorSystem = system + + + + val tableName = config.getString("tableName") + // val splitNum = config.getInt("splitNum") + val sinkNum = config.getInt("sinkNum") + + // val sink = new toHBase(UserConfig.empty, tableName) + val sinkto = new HBaseSink(UserConfig.empty, tableName) + val sink = new toHBase(sinkto) + + // sink.insert(Bytes.toBytes("row"), Bytes.toBytes("group"), + // Bytes.toBytes("name"), Bytes.toBytes("value")) + val sinkProcessor = DataSinkProcessor(sink, sinkNum) + // val split = Processor[DataSource](splitNum) + + // val computation = split ~> sinkProcessor + val computation = sinkProcessor + val application = StreamApplication("HBase", Graph(computation), UserConfig.empty) + application + + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + + val debugMode = config.getBoolean("debug") + val sleepSeconds = config.getInt("sleep") + + val localCluster = if (debugMode) { + val cluster = new EmbeddedCluster(akkaConf: Config) + cluster.start() + Some(cluster) + } else { + None + } + + val context: ClientContext = localCluster match { + case Some(local) => local.newClientContext + case None => ClientContext(akkaConf) + } + + val app = application(config, context.system) + context.submit(app) + + if (debugMode) { + Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging. + } + + context.close() + localCluster.map(_.stop()) + } + +} diff --git a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala new file mode 100644 index 000000000..6e48c2b16 --- /dev/null +++ b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala @@ -0,0 +1,57 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gearpump.streaming.hbase + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.external.hbase.HBaseSink +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.hadoop.hbase.util.Bytes + +class toHBase(hBaseSink: HBaseSink) extends DataSink { + + + // val hbaseSink = new HBaseSink(userConfig, tableName) + + + var x = 1 + while (x < 100) { + hBaseSink.insert(Bytes.toBytes("row5"), Bytes.toBytes("group"), + Bytes.toBytes("group:name"), Bytes.toBytes("10000")) + x += 1 + // scalastyle:off + println("This is : " + x ) + } + + override def open(context: TaskContext): Unit = { + hBaseSink.open(context) + } + + override def write(message: Message): Unit = { + hBaseSink.write(message) + } + + override def close(): Unit = { + hBaseSink.close() + } +} + diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index d4f381514..99c185082 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -164,6 +164,7 @@ object HBaseSink { val userName = userConfig.getString(HBASE_USER) if (userName.isEmpty) { ConnectionFactory.createConnection(configuration) + } else { val user = UserProvider.instantiate(configuration) .create(UserGroupInformation.createRemoteUser(userName.get)) diff --git a/project/BuildExample.scala b/project/BuildExample.scala index 75fc9be43..e8f593320 100644 --- a/project/BuildExample.scala +++ b/project/BuildExample.scala @@ -27,7 +27,7 @@ object BuildExample extends sbt.Build { id = "gearpump-examples", base = file("examples"), settings = commonSettings ++ noPublish - ).aggregate(wordcount, wordcountJava, complexdag, sol, fsio, examples_kafka, + ).aggregate(wordcount, wordcountJava, complexdag, sol, fsio, examples_kafka, hbase, distributedshell, stockcrawler, transport, examples_state, pagerank, distributeservice). disablePlugins(sbtassembly.AssemblyPlugin) @@ -57,6 +57,19 @@ object BuildExample extends sbt.Build { ) ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + lazy val hbase = Project( + id = "gearpump-examples-hbase", + base = file("examples/streaming/hbase"), + settings = commonSettings ++ noPublish ++ myAssemblySettings ++ + Seq( + mainClass in(Compile, packageBin) := + Some("org.apache.gearpump.streaming.examples.hbase.HbaseConn"), + + target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / + CrossVersion.binaryScalaVersion(scalaVersion.value) + ) + ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided", external_hbase) + lazy val sol = Project( id = "gearpump-examples-sol", base = file("examples/streaming/sol"), From a14df0d10d03219800a6b1a166ec9d773cfa76b3 Mon Sep 17 00:00:00 2001 From: roshanson Date: Wed, 21 Sep 2016 12:33:57 +0800 Subject: [PATCH 06/19] [GEARPUMP-204]add unit test for external_hbase module --- .../gearpump/streaming/hbase/toHBase.scala | 4 +- .../gearpump/external/hbase/HBaseSink.scala | 42 +++++++++++-------- .../external/hbase/HBaseSinkSpec.scala | 10 +++-- 3 files changed, 32 insertions(+), 24 deletions(-) diff --git a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala index 6e48c2b16..00d02e865 100644 --- a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala +++ b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala @@ -35,8 +35,8 @@ class toHBase(hBaseSink: HBaseSink) extends DataSink { var x = 1 while (x < 100) { - hBaseSink.insert(Bytes.toBytes("row5"), Bytes.toBytes("group"), - Bytes.toBytes("group:name"), Bytes.toBytes("10000")) + hBaseSink.insert(Bytes.toBytes("row6"), Bytes.toBytes("group"), + Bytes.toBytes("group:name"), Bytes.toBytes("100000")) x += 1 // scalastyle:off println("This is : " + x ) diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index 99c185082..1434d953a 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -19,29 +19,23 @@ package org.apache.gearpump.external.hbase import java.io.{File, ObjectInputStream, ObjectOutputStream} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} -import org.apache.hadoop.hbase.security.{User, UserProvider} -import org.apache.hadoop.security.UserGroupInformation - import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.{Constants, FileUtils} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put} +import org.apache.hadoop.hbase.security.UserProvider +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} +import org.apache.hadoop.security.UserGroupInformation class HBaseSink(userconfig: UserConfig, tableName: String, @transient var connection: Connection, - @transient var configuration: Configuration, @transient var isTest: Boolean) + @transient var configuration: Configuration) extends DataSink { - connection = - if (isTest) { - connection - } else { - HBaseSink.getConnection(userconfig, configuration) - } + // var connection = HBaseSink.getConnection(userconfig, configuration) lazy val table = connection.getTable(TableName.valueOf(tableName)) @@ -49,11 +43,11 @@ class HBaseSink(userconfig: UserConfig, tableName: String, @transient var connec def this(userconfig: UserConfig, tableName: String) = { this(userconfig, tableName, HBaseSink.getConnection(userconfig, HBaseConfiguration.create()), - HBaseConfiguration.create(), false) + HBaseConfiguration.create()) } def this(userconfig: UserConfig, tableName: String, configuration: Configuration) = { this(userconfig, tableName, HBaseSink.getConnection(userconfig, HBaseConfiguration.create()), - HBaseConfiguration.create(), false) + HBaseConfiguration.create()) } @@ -135,9 +129,9 @@ object HBaseSink { } def apply[T]( userconfig: UserConfig, tableName: String, connection: Connection, - configuration: Configuration, isTest: Boolean ) + configuration: Configuration) : HBaseSink = { - new HBaseSink(userconfig, tableName, connection, configuration, isTest) + new HBaseSink(userconfig, tableName, connection, configuration ) } private def getConnection(userConfig: UserConfig, configuration: Configuration): Connection = { @@ -171,5 +165,17 @@ object HBaseSink { ConnectionFactory.createConnection(configuration, user) } + + } + def getConn(connection: Connection, isTest: Boolean, userConfig: UserConfig, + configuration: Configuration): Connection = { + val conn = + if (isTest) { + connection + } else { + HBaseSink.getConnection(userConfig, configuration) + } + conn + } } diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala index 4ae5c0ad7..8e8361b72 100644 --- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala +++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala @@ -35,7 +35,8 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock val table = mock[Table] val config = mock[Configuration] - val connection = mock[Connection] + val conn = mock[Connection] + // val connection = mock[Connection] val taskContext = mock[TaskContext] // val connectionFactory = mock[ConnectionFactory] @@ -52,13 +53,14 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock val name = "name" val value = "1.2" - when(connection.getTable(TableName.valueOf(tablename))).thenReturn(table) - + when(conn.getTable(TableName.valueOf(tablename))).thenReturn(table) + // when(HBaseSink.getConn(conn, true, userconfig, config)).thenReturn(connection) + val connection = HBaseSink.getConn(conn, true, userconfig, config) val put = new Put(Bytes.toBytes(row)) put.addColumn(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) - val hbaseSink = HBaseSink(userconfig, tablename, connection, config, true) + val hbaseSink = HBaseSink(userconfig, tablename, connection, config) hbaseSink.open(taskContext) hbaseSink.insert(Bytes.toBytes(row), Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) From 8ef7dbb0d9b816a595441e66b0f71a041fc5b9f8 Mon Sep 17 00:00:00 2001 From: roshanson Date: Wed, 21 Sep 2016 13:57:24 +0800 Subject: [PATCH 07/19] [GEARPUMP-204]add unit test for external_hbase module --- .../gearpump/streaming/hbase/HbaseConn.scala | 102 ------------------ .../gearpump/streaming/hbase/toHBase.scala | 57 ---------- .../external/hbase/HBaseSinkSpec.scala | 8 +- project/BuildExample.scala | 14 +-- 4 files changed, 3 insertions(+), 178 deletions(-) delete mode 100644 examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/HbaseConn.scala delete mode 100644 examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala diff --git a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/HbaseConn.scala b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/HbaseConn.scala deleted file mode 100644 index 2fdec5f1c..000000000 --- a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/HbaseConn.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.hbase - - -import akka.actor.ActorSystem -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.embedded.EmbeddedCluster -import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import org.apache.gearpump.external.hbase.HBaseSink -import org.apache.gearpump.streaming.StreamApplication -import org.apache.gearpump.streaming.sink.DataSinkProcessor -import org.apache.gearpump.util.Graph.Node -import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} -import org.slf4j.Logger - -object HbaseConn extends AkkaApp with ArgumentsParser { - private val LOG: Logger = LogUtil.getLogger(getClass) - val RUN_FOR_EVER = -1 - - override val options: Array[(String, CLIOption[Any])] = Array( - "tableName" -> CLIOption[String]("", required = false, defaultValue = Some("sss")), - "sinkNum" -> CLIOption[Int]("", required = false, defaultValue = Some(1)), - "debug" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)), - "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false, - defaultValue = Some(30)) - - ) - - def application(config: ParseResult, system: ActorSystem): StreamApplication = { - implicit val actorSystem = system - - - - val tableName = config.getString("tableName") - // val splitNum = config.getInt("splitNum") - val sinkNum = config.getInt("sinkNum") - - // val sink = new toHBase(UserConfig.empty, tableName) - val sinkto = new HBaseSink(UserConfig.empty, tableName) - val sink = new toHBase(sinkto) - - // sink.insert(Bytes.toBytes("row"), Bytes.toBytes("group"), - // Bytes.toBytes("name"), Bytes.toBytes("value")) - val sinkProcessor = DataSinkProcessor(sink, sinkNum) - // val split = Processor[DataSource](splitNum) - - // val computation = split ~> sinkProcessor - val computation = sinkProcessor - val application = StreamApplication("HBase", Graph(computation), UserConfig.empty) - application - - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - - val debugMode = config.getBoolean("debug") - val sleepSeconds = config.getInt("sleep") - - val localCluster = if (debugMode) { - val cluster = new EmbeddedCluster(akkaConf: Config) - cluster.start() - Some(cluster) - } else { - None - } - - val context: ClientContext = localCluster match { - case Some(local) => local.newClientContext - case None => ClientContext(akkaConf) - } - - val app = application(config, context.system) - context.submit(app) - - if (debugMode) { - Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging. - } - - context.close() - localCluster.map(_.stop()) - } - -} diff --git a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala deleted file mode 100644 index 00d02e865..000000000 --- a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/hbase/toHBase.scala +++ /dev/null @@ -1,57 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.gearpump.streaming.hbase - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.external.hbase.HBaseSink -import org.apache.gearpump.streaming.sink.DataSink -import org.apache.gearpump.streaming.task.TaskContext -import org.apache.hadoop.hbase.util.Bytes - -class toHBase(hBaseSink: HBaseSink) extends DataSink { - - - // val hbaseSink = new HBaseSink(userConfig, tableName) - - - var x = 1 - while (x < 100) { - hBaseSink.insert(Bytes.toBytes("row6"), Bytes.toBytes("group"), - Bytes.toBytes("group:name"), Bytes.toBytes("100000")) - x += 1 - // scalastyle:off - println("This is : " + x ) - } - - override def open(context: TaskContext): Unit = { - hBaseSink.open(context) - } - - override def write(message: Message): Unit = { - hBaseSink.write(message) - } - - override def close(): Unit = { - hBaseSink.close() - } -} - diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala index 8e8361b72..12107a0eb 100644 --- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala +++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala @@ -36,9 +36,8 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock val table = mock[Table] val config = mock[Configuration] val conn = mock[Connection] - // val connection = mock[Connection] val taskContext = mock[TaskContext] - // val connectionFactory = mock[ConnectionFactory] + val map = Map[String, String]("HBASESINK" -> "hbasesink", "TABLE_NAME" -> "hbase.table.name", "COLUMN_FAMILY" -> "hbase.table.column.family", "COLUMN_NAME" -> "hbase.table.column.name", @@ -46,7 +45,6 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock "GEARPUMP_KEYTAB_FILE" -> "gearpump.keytab.file" ) val userconfig = new UserConfig(map) - // val tableName = new TableName() val tablename = "hbase" val row = "row" val group = "group" @@ -54,12 +52,10 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock val value = "1.2" when(conn.getTable(TableName.valueOf(tablename))).thenReturn(table) - // when(HBaseSink.getConn(conn, true, userconfig, config)).thenReturn(connection) - val connection = HBaseSink.getConn(conn, true, userconfig, config) + val connection = HBaseSink.getConn(conn, true, userconfig, config) val put = new Put(Bytes.toBytes(row)) put.addColumn(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) - val hbaseSink = HBaseSink(userconfig, tablename, connection, config) hbaseSink.open(taskContext) hbaseSink.insert(Bytes.toBytes(row), Bytes.toBytes(group), Bytes.toBytes(name), diff --git a/project/BuildExample.scala b/project/BuildExample.scala index e8f593320..0a89d2e2b 100644 --- a/project/BuildExample.scala +++ b/project/BuildExample.scala @@ -27,7 +27,7 @@ object BuildExample extends sbt.Build { id = "gearpump-examples", base = file("examples"), settings = commonSettings ++ noPublish - ).aggregate(wordcount, wordcountJava, complexdag, sol, fsio, examples_kafka, hbase, + ).aggregate(wordcount, wordcountJava, complexdag, sol, fsio, examples_kafka, distributedshell, stockcrawler, transport, examples_state, pagerank, distributeservice). disablePlugins(sbtassembly.AssemblyPlugin) @@ -57,18 +57,6 @@ object BuildExample extends sbt.Build { ) ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") - lazy val hbase = Project( - id = "gearpump-examples-hbase", - base = file("examples/streaming/hbase"), - settings = commonSettings ++ noPublish ++ myAssemblySettings ++ - Seq( - mainClass in(Compile, packageBin) := - Some("org.apache.gearpump.streaming.examples.hbase.HbaseConn"), - - target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / - CrossVersion.binaryScalaVersion(scalaVersion.value) - ) - ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided", external_hbase) lazy val sol = Project( id = "gearpump-examples-sol", From 87364c049fd283c5da6898abf61973eae031a8a7 Mon Sep 17 00:00:00 2001 From: roshanson Date: Thu, 22 Sep 2016 11:36:25 +0800 Subject: [PATCH 08/19] [GEARPUMP-204]add unit test for external_hbase module --- conf/gear.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/gear.conf b/conf/gear.conf index 6037e2282..fa0a3e997 100644 --- a/conf/gear.conf +++ b/conf/gear.conf @@ -243,7 +243,7 @@ gearpump { ################### executor { vmargs = "-server -Xms512M -Xmx1024M -Xss1M -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseParNewGC -XX:NewRatio=3 -Djava.rmi.server.hostname=localhost" - extraClasspath = "/home/manuzhang/hbase-1.2.2/lib/*:/home/manuzhang/hbase-1.2.2/conf" + extraClasspath = "" } ### Streaming related configuration From 3086e4535fbbaa1c6e652f8e3cc3fdeb7bf24ead Mon Sep 17 00:00:00 2001 From: roshanson Date: Thu, 22 Sep 2016 13:12:49 +0800 Subject: [PATCH 09/19] [GEARPUMP-204]add unit test for external_hbase module --- .../gearpump/streaming/examples/wordcount/WordCountSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala index 5121815cd..f7035526f 100644 --- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala @@ -44,7 +44,7 @@ class WordCountSpec property("WordCount should succeed to submit application with required arguments") { val requiredArgs = Array.empty[String] val optionalArgs = Array( - "-source", "1", + "-split", "1", "-sum", "1") val args = { From ff7b930a6d2cd8081bdf2be1f5c2d69c2d144fd8 Mon Sep 17 00:00:00 2001 From: roshanson Date: Thu, 22 Sep 2016 14:59:19 +0800 Subject: [PATCH 10/19] [GEARPUMP-204]add unit test for external_hbase module --- .../gearpump/external/hbase/HBaseSink.scala | 30 +++++++------------ .../external/hbase/HBaseSinkSpec.scala | 10 ++----- project/BuildExample.scala | 1 - 3 files changed, 12 insertions(+), 29 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index 1434d953a..eb9dceda9 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -35,8 +35,6 @@ class HBaseSink(userconfig: UserConfig, tableName: String, @transient var connec @transient var configuration: Configuration) extends DataSink { - - // var connection = HBaseSink.getConnection(userconfig, configuration) lazy val table = connection.getTable(TableName.valueOf(tableName)) override def open(context: TaskContext): Unit = {} @@ -47,11 +45,9 @@ class HBaseSink(userconfig: UserConfig, tableName: String, @transient var connec } def this(userconfig: UserConfig, tableName: String, configuration: Configuration) = { this(userconfig, tableName, HBaseSink.getConnection(userconfig, HBaseConfiguration.create()), - HBaseConfiguration.create()) + configuration) } - - def insert(rowKey: String, columnGroup: String, columnName: String, value: String): Unit = { insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup), Bytes.toBytes(columnName), Bytes.toBytes(value)) @@ -95,12 +91,19 @@ class HBaseSink(userconfig: UserConfig, tableName: String, @transient var connec connection.close() } - + /** + * Overrides Java's default serialization + * Please do not remove this + */ private def writeObject(out: ObjectOutputStream): Unit = { out.defaultWriteObject() configuration.write(out) } + /** + * Overrides Java's default serialization + * Please do not remove this + */ private def readObject(in: ObjectInputStream): Unit = { in.defaultReadObject() val clientConf = new Configuration(false) @@ -116,7 +119,6 @@ object HBaseSink { val COLUMN_NAME = "hbase.table.column.name" val HBASE_USER = "hbase.user" - def apply[T](userconfig: UserConfig, tableName: String, configuration: Configuration) : HBaseSink = { new HBaseSink(userconfig, tableName, configuration) @@ -127,6 +129,7 @@ object HBaseSink { : HBaseSink = { new HBaseSink(userconfig, tableName) } + def apply[T]( userconfig: UserConfig, tableName: String, connection: Connection, configuration: Configuration) @@ -158,24 +161,11 @@ object HBaseSink { val userName = userConfig.getString(HBASE_USER) if (userName.isEmpty) { ConnectionFactory.createConnection(configuration) - } else { val user = UserProvider.instantiate(configuration) .create(UserGroupInformation.createRemoteUser(userName.get)) ConnectionFactory.createConnection(configuration, user) } - - } - def getConn(connection: Connection, isTest: Boolean, userConfig: UserConfig, - configuration: Configuration): Connection = { - val conn = - if (isTest) { - connection - } else { - HBaseSink.getConnection(userConfig, configuration) - } - conn - } } diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala index 12107a0eb..5a8407cd4 100644 --- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala +++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala @@ -35,10 +35,9 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock val table = mock[Table] val config = mock[Configuration] - val conn = mock[Connection] + val connection = mock[Connection] val taskContext = mock[TaskContext] - val map = Map[String, String]("HBASESINK" -> "hbasesink", "TABLE_NAME" -> "hbase.table.name", "COLUMN_FAMILY" -> "hbase.table.column.family", "COLUMN_NAME" -> "hbase.table.column.name", "HBASE_USER" -> "hbase.user", "GEARPUMP_KERBEROS_PRINCIPAL" -> "gearpump.kerberos.principal", @@ -51,9 +50,8 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock val name = "name" val value = "1.2" - when(conn.getTable(TableName.valueOf(tablename))).thenReturn(table) + when(connection.getTable(TableName.valueOf(tablename))).thenReturn(table) - val connection = HBaseSink.getConn(conn, true, userconfig, config) val put = new Put(Bytes.toBytes(row)) put.addColumn(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) val hbaseSink = HBaseSink(userconfig, tablename, connection, config) @@ -61,9 +59,5 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock hbaseSink.insert(Bytes.toBytes(row), Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) verify(table).put(MockUtil.argMatch[Put](_.getRow sameElements Bytes.toBytes(row))) - - - } } - diff --git a/project/BuildExample.scala b/project/BuildExample.scala index 0a89d2e2b..75fc9be43 100644 --- a/project/BuildExample.scala +++ b/project/BuildExample.scala @@ -57,7 +57,6 @@ object BuildExample extends sbt.Build { ) ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") - lazy val sol = Project( id = "gearpump-examples-sol", base = file("examples/streaming/sol"), From f07f7feba571ba4a2ba830f8ae794b9dcd8f1e54 Mon Sep 17 00:00:00 2001 From: roshanson Date: Thu, 22 Sep 2016 15:16:31 +0800 Subject: [PATCH 11/19] [GEARPUMP-204]add unit test for external_hbase module --- .../org/apache/gearpump/external/hbase/HBaseSink.scala | 7 ------- .../org/apache/gearpump/external/hbase/HBaseSinkSpec.scala | 2 +- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index eb9dceda9..2021f1fef 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -130,13 +130,6 @@ object HBaseSink { new HBaseSink(userconfig, tableName) } - def apply[T]( - userconfig: UserConfig, tableName: String, connection: Connection, - configuration: Configuration) - : HBaseSink = { - new HBaseSink(userconfig, tableName, connection, configuration ) - } - private def getConnection(userConfig: UserConfig, configuration: Configuration): Connection = { if (UserGroupInformation.isSecurityEnabled) { val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL) diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala index 5a8407cd4..36640f283 100644 --- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala +++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala @@ -54,7 +54,7 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock val put = new Put(Bytes.toBytes(row)) put.addColumn(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) - val hbaseSink = HBaseSink(userconfig, tablename, connection, config) + val hbaseSink = new HBaseSink(userconfig, tablename, connection, config) hbaseSink.open(taskContext) hbaseSink.insert(Bytes.toBytes(row), Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) From 4920f2598721beede24e182205a87b72b9f13afb Mon Sep 17 00:00:00 2001 From: roshanson Date: Thu, 22 Sep 2016 16:46:08 +0800 Subject: [PATCH 12/19] [GEARPUMP-204]add unit test for external_hbase module --- .../scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala index 36640f283..41ebc52aa 100644 --- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala +++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala @@ -58,6 +58,7 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock hbaseSink.open(taskContext) hbaseSink.insert(Bytes.toBytes(row), Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) + verify(table).put(MockUtil.argMatch[Put](_.getRow sameElements Bytes.toBytes(row))) } } From 2e6ea2c33fdacddff283ea25d93493d1ef30bb4d Mon Sep 17 00:00:00 2001 From: Roshanson Date: Thu, 22 Sep 2016 16:49:29 +0800 Subject: [PATCH 13/19] [GEARPUMP-204]add unit test for external_hbase module --- .../org/apache/gearpump/external/hbase/HBaseSinkSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala index 41ebc52aa..7ebfa48e2 100644 --- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala +++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala @@ -48,7 +48,7 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock val row = "row" val group = "group" val name = "name" - val value = "1.2" + val value = "2.0" when(connection.getTable(TableName.valueOf(tablename))).thenReturn(table) From 1caf488e88253380c01e319d59957f36694784a3 Mon Sep 17 00:00:00 2001 From: Roshanson <736781877@qq.com> Date: Thu, 22 Sep 2016 19:41:31 +0800 Subject: [PATCH 14/19] [GEARPUMP-204]Add unit test for external_hbase module --- .../org/apache/gearpump/external/hbase/HBaseSink.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index 2021f1fef..00b0cff30 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -40,11 +40,11 @@ class HBaseSink(userconfig: UserConfig, tableName: String, @transient var connec override def open(context: TaskContext): Unit = {} def this(userconfig: UserConfig, tableName: String) = { - this(userconfig, tableName, HBaseSink.getConnection(userconfig, HBaseConfiguration.create()), - HBaseConfiguration.create()) + this(userconfig, tableName, HBaseSink.getConnection(userconfig, HBaseSink.CONFIG), + HBaseSink.CONFIG) } def this(userconfig: UserConfig, tableName: String, configuration: Configuration) = { - this(userconfig, tableName, HBaseSink.getConnection(userconfig, HBaseConfiguration.create()), + this(userconfig, tableName, HBaseSink.getConnection(userconfig, configuration), configuration) } @@ -101,7 +101,7 @@ class HBaseSink(userconfig: UserConfig, tableName: String, @transient var connec } /** - * Overrides Java's default serialization + * Overrides Java's default deserialization * Please do not remove this */ private def readObject(in: ObjectInputStream): Unit = { @@ -118,6 +118,7 @@ object HBaseSink { val COLUMN_FAMILY = "hbase.table.column.family" val COLUMN_NAME = "hbase.table.column.name" val HBASE_USER = "hbase.user" + private val CONFIG = HBaseConfiguration.create() def apply[T](userconfig: UserConfig, tableName: String, configuration: Configuration) : HBaseSink = { From 52d4b8d6ff4e1720147b9c088412625f5312f0a0 Mon Sep 17 00:00:00 2001 From: Roshanson Date: Fri, 23 Sep 2016 10:41:37 +0800 Subject: [PATCH 15/19] [GEARPUMP-204]add unit test for external_hbase module --- .../org/apache/gearpump/external/hbase/HBaseSink.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index 00b0cff30..f787ce2b8 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -39,15 +39,15 @@ class HBaseSink(userconfig: UserConfig, tableName: String, @transient var connec override def open(context: TaskContext): Unit = {} - def this(userconfig: UserConfig, tableName: String) = { - this(userconfig, tableName, HBaseSink.getConnection(userconfig, HBaseSink.CONFIG), - HBaseSink.CONFIG) - } def this(userconfig: UserConfig, tableName: String, configuration: Configuration) = { this(userconfig, tableName, HBaseSink.getConnection(userconfig, configuration), configuration) } + def this(userconfig: UserConfig, tableName: String) = { + this(userconfig, tableName, HBaseConfiguration.create()) + } + def insert(rowKey: String, columnGroup: String, columnName: String, value: String): Unit = { insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup), Bytes.toBytes(columnName), Bytes.toBytes(value)) @@ -118,7 +118,6 @@ object HBaseSink { val COLUMN_FAMILY = "hbase.table.column.family" val COLUMN_NAME = "hbase.table.column.name" val HBASE_USER = "hbase.user" - private val CONFIG = HBaseConfiguration.create() def apply[T](userconfig: UserConfig, tableName: String, configuration: Configuration) : HBaseSink = { From d52f51da4ccb797c7ef954b396b88735a2bf969e Mon Sep 17 00:00:00 2001 From: Roshanson <736781877@qq.com> Date: Fri, 23 Sep 2016 11:33:33 +0800 Subject: [PATCH 16/19] [GEARPUMP-204]add unit test for external_hbase module --- .../scala/org/apache/gearpump/external/hbase/HBaseSink.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index f787ce2b8..f896fe929 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -124,8 +124,7 @@ object HBaseSink { new HBaseSink(userconfig, tableName, configuration) } - def apply[T]( - userconfig: UserConfig, tableName: String) + def apply[T](userconfig: UserConfig, tableName: String) : HBaseSink = { new HBaseSink(userconfig, tableName) } From 501dca4861549e27beeb9a6ef89f4bb9902a3249 Mon Sep 17 00:00:00 2001 From: Roshanson <736781877@qq.com> Date: Tue, 27 Sep 2016 18:53:25 +0800 Subject: [PATCH 17/19] [GEARPUMP-204]Add unit test for external_hbase module --- .../org/apache/gearpump/external/hbase/HBaseSink.scala | 10 ++++++---- .../apache/gearpump/external/hbase/HBaseSinkSpec.scala | 9 +++++---- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index f896fe929..0822c6d88 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -31,17 +31,19 @@ import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.hadoop.security.UserGroupInformation -class HBaseSink(userconfig: UserConfig, tableName: String, @transient var connection: Connection, - @transient var configuration: Configuration) +class HBaseSink(userconfig: UserConfig, tableName: String, + val conn: (UserConfig, Configuration) + => Connection, @transient var configuration: Configuration) extends DataSink { + lazy val connection = conn(userconfig, configuration) lazy val table = connection.getTable(TableName.valueOf(tableName)) override def open(context: TaskContext): Unit = {} def this(userconfig: UserConfig, tableName: String, configuration: Configuration) = { - this(userconfig, tableName, HBaseSink.getConnection(userconfig, configuration), - configuration) + this(userconfig, tableName, (userconfig: UserConfig, config: Configuration) => + {HBaseSink.getConnection(userconfig, config)}, configuration) } def this(userconfig: UserConfig, tableName: String) = { diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala index 7ebfa48e2..b13e7c593 100644 --- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala +++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala @@ -43,18 +43,19 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock "HBASE_USER" -> "hbase.user", "GEARPUMP_KERBEROS_PRINCIPAL" -> "gearpump.kerberos.principal", "GEARPUMP_KEYTAB_FILE" -> "gearpump.keytab.file" ) - val userconfig = new UserConfig(map) - val tablename = "hbase" + val userConfig = new UserConfig(map) + val tableName = "hbase" val row = "row" val group = "group" val name = "name" val value = "2.0" - when(connection.getTable(TableName.valueOf(tablename))).thenReturn(table) + when(connection.getTable(TableName.valueOf(tableName))).thenReturn(table) val put = new Put(Bytes.toBytes(row)) put.addColumn(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) - val hbaseSink = new HBaseSink(userconfig, tablename, connection, config) + val hbaseSink = new HBaseSink(userConfig, tableName, (userConfig, config) + => connection, config) hbaseSink.open(taskContext) hbaseSink.insert(Bytes.toBytes(row), Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) From c8120978b8609b41012823707ff04b830f1dfb50 Mon Sep 17 00:00:00 2001 From: Roshanson <736781877@qq.com> Date: Tue, 27 Sep 2016 19:54:50 +0800 Subject: [PATCH 18/19] [GEARPUMP-204]Add unit test for external_hbase module --- .../org/apache/gearpump/external/hbase/HBaseSinkSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala index b13e7c593..62da2b1f4 100644 --- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala +++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala @@ -48,7 +48,7 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock val row = "row" val group = "group" val name = "name" - val value = "2.0" + val value = "3.0" when(connection.getTable(TableName.valueOf(tableName))).thenReturn(table) From 6ac021e4a5f7b625712fe1d71bea8e1f352cc919 Mon Sep 17 00:00:00 2001 From: Roshanson <736781877@qq.com> Date: Wed, 28 Sep 2016 10:32:57 +0800 Subject: [PATCH 19/19] [GEARPUMP-204]add unit test for external_hbase module --- .../scala/org/apache/gearpump/external/hbase/HBaseSink.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index 0822c6d88..f85c43bda 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -42,8 +42,7 @@ class HBaseSink(userconfig: UserConfig, tableName: String, override def open(context: TaskContext): Unit = {} def this(userconfig: UserConfig, tableName: String, configuration: Configuration) = { - this(userconfig, tableName, (userconfig: UserConfig, config: Configuration) => - {HBaseSink.getConnection(userconfig, config)}, configuration) + this(userconfig, tableName, HBaseSink.getConnection, configuration) } def this(userconfig: UserConfig, tableName: String) = {