Skip to content
Browse files

FLUME-467: Merge cloudera/hbase with trunk cloudera/master

- contains: FLUME-406, FLUME-366, FLUME-354, FLUME-247, FLUME-6
- updates hadoop and zk jars to newer version
- includes hadoop-test jar.
- update hbase sink to newer flume version's apis
  • Loading branch information...
1 parent a51da8d commit b68128d86085c8d8ff2e4c1fa47f214746e7d952 @jmhsieh jmhsieh committed Apr 18, 2011
View
2 flume-core/src/main/java/com/cloudera/flume/agent/FlumeNode.java
@@ -295,7 +295,7 @@ synchronized public void stop() {
try {
http.stop();
} catch (Exception e) {
- LOG.error("Exception stopping FlumeNode", e);
+ LOG.error("Stopping http server failed: " + e);
}
}
View
5 flume-core/src/main/java/com/cloudera/flume/master/FlumeMaster.java
@@ -364,10 +364,9 @@ public void shutdown() {
ZooKeeperService.get().shutdown();
}
- } catch (IOException e) {
- LOG.error("Exception when shutting down master!", e);
} catch (Exception e) {
- LOG.error("Exception when shutting down master!", e);
+ LOG.error("Exception when shutting down master! " + e.getMessage());
+ LOG.debug(e.getMessage(), e);
}
}
View
14 flume-core/src/test/java/com/cloudera/flume/agent/durability/TestConcurrentWALMan.java
@@ -28,11 +28,12 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.cloudera.flume.agent.DirectMasterRPC;
import com.cloudera.flume.agent.FlumeNode;
@@ -59,12 +60,7 @@
* the best solution but workable.
*/
public class TestConcurrentWALMan {
- public static Logger LOG = Logger.getLogger(TestConcurrentWALMan.class);
-
- @Before
- public void setDebug() {
- Logger.getRootLogger().setLevel(Level.DEBUG);
- }
+ public static Logger LOG = LoggerFactory.getLogger(TestConcurrentWALMan.class);
@Test
public void test1thread() throws IOException, InterruptedException {
@@ -153,7 +149,7 @@ public void run() {
snk.close();
FileUtil.rmr(f1);
} catch (Exception e) {
- LOG.error(e, e);
+ LOG.error(e.toString(), e);
} finally {
done.countDown();
}
View
2 flume-core/src/test/java/com/cloudera/util/TestStatusHttpServer.java
@@ -38,7 +38,7 @@
.getLogger(TestStatusHttpServer.class);
@Test
- public void testOpenClose() throws IOException, Exception {
+ public void testOpenClose() throws Exception {
// Set directory of webapps to build-specific dir
FlumeConfiguration.get().set(FlumeConfiguration.WEBAPPS_PATH,
"build/webapps");
View
41 ivysettings.xml
@@ -0,0 +1,41 @@
+<ivysettings>
+
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+ <property name="repo.maven.org"
+ value="http://repo1.maven.org/maven2/" override="false"/>
+ <property name="repo.cloudera.com"
+ value="https://repository.cloudera.com/content/repositories/releases/" override="false"/>
+ <property name="maven2.pattern"
+ value="[organisation]/[module]/[revision]/[module]-[revision]"/>
+ <property name="maven2.pattern.ext" value="${maven2.pattern}.[ext]"/>
+ <include url="${ivy.default.conf.dir}/ivyconf-local.xml"/>
+ <settings defaultResolver="default"/>
+ <resolvers>
+ <ibiblio name="maven2" root="${repo.maven.org}"
+ pattern="${maven2.pattern.ext}" m2compatible="true"/>
+ <ibiblio name="cloudera-maven2" root="${repo.cloudera.com}"
+ pattern="${maven2.pattern.ext}" m2compatible="true"/>
+
+ <chain name="default" dual="true">
+ <resolver ref="maven2"/>
+ <resolver ref="cloudera-maven2"/>
+ </chain>
+
+ </resolvers>
+</ivysettings>
View
173 plugins/hbasesink/build.xml
@@ -0,0 +1,173 @@
+<?xml version="1.0"?>
+<!--
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+
+<!--
+ Before you can run these subtargets directly, you need
+ to call at top-level: ant
+-->
+
+<project name="flume-hbase" default="jar">
+ <property name="javac.debug" value="on"/>
+ <property name="javac.optimize" value="on"/>
+ <property name="javac.deprecation" value="off"/>
+ <property name="javac.version" value="1.5"/>
+ <property name="javac.args" value=""/>
+ <property name="javac.args.warnings" value="-Xlint:unchecked"/>
+
+ <property name="flume.base" value="../.."/>
+ <property name="build.dir" value="build"/>
+ <property name="build.test" value="${build.dir}/test"/>
+ <property name="build.encoding" value="ISO-8859-1"/>
+
+ <property name="test.src.dir" value="src/javatest"/>
+ <property name="test.lib.dir" value="${flume.base}/libtest"/>
+ <property name="test.build.dir" value="${build.dir}/test"/>
+ <property name="test.generated.dir" value="${test.build.dir}/src"/>
+ <property name="test.build.data" value="${test.build.dir}/data"/>
+ <property name="test.log.dir" value="${test.build.dir}/logs"/>
+ <property name="test.build.classes" value="${test.build.dir}/classes"/>
+ <property name="test.include" value="Test*"/>
+ <property name="test.classpath.id" value="test.classpath"/>
+ <property name="test.output" value="yes"/>
+ <property name="test.timeout" value="900000"/>
+ <property name="test.junit.output.format" value="plain"/>
+
+ <path id="classpath">
+ <!-- in case we are running in dev env -->
+ <pathelement location="${flume.base}/build/classes"/>
+ <fileset dir="${flume.base}">
+ <include name="flume-*.jar" />
+ </fileset>
+ <fileset dir="${flume.base}/lib">
+ <include name="**/slf4j-*.jar" />
+ <include name="**/hadoop-*.jar" />
+ <include name="**/guava-*.jar" />
+ </fileset>
+ <fileset dir="${flume.base}/plugins/hbasesink/lib">
+ <include name="**/*.jar" />
+ </fileset>
+ </path>
+
+ <!-- the unit test classpath: uses test.src.dir for configuration -->
+ <path id="test.classpath">
+ <pathelement location="${test.build.classes}" />
+ <pathelement location="${test.src.dir}"/>
+ <pathelement location="${build.dir}"/>
+ <fileset dir="${test.lib.dir}">
+ <include name="**/*.jar" />
+ <exclude name="**/excluded/" />
+ </fileset>
+ <fileset dir="${flume.base}/lib">
+ <include name="**/*.jar" />
+ <exclude name="**/excluded/" />
+ </fileset>
+ <fileset dir="${flume.base}/plugins/hbasesink">
+ <include name="**/*.jar" />
+ <exclude name="**/excluded/" />
+ </fileset>
+ <fileset dir="${flume.base}/plugins/hbasesink/lib">
+ <include name="**/*.jar" />
+ </fileset>
+ <path refid="classpath"/>
+ </path>
+
+ <target name="jar">
+ <mkdir dir="${build.dir}"/>
+ <mkdir dir="${build.dir}/classes"/>
+
+ <javac srcdir="./src/java" destdir="${build.dir}/classes" debug="${javac.debug}">
+ <classpath refid="classpath"/>
+ </javac>
+
+ <jar jarfile="hbase_sink.jar" basedir="${build.dir}/classes"/>
+ </target>
+
+ <target name="clean">
+ <echo message="Cleaning generated files and stuff"/>
+ <delete dir="build" />
+ <delete file="hbase_sink.jar" />
+ </target>
+
+ <!-- ================================================================== -->
+ <!-- Compile test code -->
+ <!-- ================================================================== -->
+ <target name="compile-plugin-test" depends="jar">
+ <mkdir dir="${build.dir}/test"/>
+ <mkdir dir="${build.dir}/test/classes"/>
+
+ <javac
+ encoding="${build.encoding}"
+ srcdir="${test.src.dir}"
+ includes="**/*.java"
+ destdir="${test.build.classes}"
+ debug="${javac.debug}"
+ optimize="${javac.optimize}"
+ target="${javac.version}"
+ source="${javac.version}"
+ deprecation="${javac.deprecation}">
+ <compilerarg line="${javac.args}" />
+ <classpath refid="test.classpath"/>
+ </javac>
+ </target>
+
+
+ <!-- ================================================================== -->
+ <!-- Run unit tests -->
+ <!-- ================================================================== -->
+ <target name="test" depends="jar,compile-plugin-test" >
+ <echo message="Unit Testing of HBase Sink"/>
+ <junit
+ printsummary="yes" showoutput="${test.output}"
+ haltonfailure="no" fork="yes" maxmemory="1024m"
+ errorProperty="tests.failed" failureProperty="tests.failed"
+ timeout="${test.timeout}"
+ dir="${test.build.dir}">
+
+ <!-- uncomment this if you want to attach a debugger -->
+ <!--
+ <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=2601" />
+ -->
+ <env key="FLUME_HOME" value="${basedir}" />
+ <sysproperty key="javax.xml.parsers.DocumentBuilderFactory" value="com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl" />
+ <sysproperty key="java.library.path" value="${lib.dir}" />
+ <sysproperty key="build.test" value="${build.test}"/>
+
+ <!-- we want more log4j output when running unit tests -->
+ <sysproperty key="hadoop.root.logger"
+ value="INFO,console" />
+
+ <!-- tools.jar from Sun JDK also required to invoke javac. -->
+ <classpath>
+ <path refid="test.classpath"/>
+ <pathelement path="${env.JAVA_HOME}/lib/tools.jar" />
+ </classpath>
+ <formatter type="${test.junit.output.format}" />
+ <batchtest todir="${build.test}" unless="testcase">
+ <fileset dir="${test.src.dir}"
+ includes="**/Test*.java" excludes="**/${test.exclude}.java" />
+ </batchtest>
+ <batchtest todir="${build.test}" if="testcase">
+ <fileset dir="${test.src.dir}" includes="**/${testcase}.java"/>
+ </batchtest>
+ </junit>
+ <fail if="tests.failed">Tests failed!</fail>
+ </target>
+
+</project>
View
56 plugins/hbasesink/docs/README
@@ -0,0 +1,56 @@
+Running Flume on HBase
+----------------------
+
+1) Compile top-level flume
+ $ ant
+
+2) Copy your version of hbase-*.jar into the plugins/hbasesink/lib
+ I tested against hbase-0.89.20100621+17.jar
+
+3) Compile hbase-sink from this directory
+ ../plugins/hbasesink$ ant
+
+4) Successful compilation will produce "hbase_sink.jar" in ../plugins/hbasesink/
+
+5) Add the plugin classes in flume/conf/flume-site.xml
+<configuration>
+ <property>
+ <name>flume.plugin.classes</name>
+ <value>com.cloudera.flume.hbase.Attr2HBaseEventSink</value>
+ <description>Comma separated list of plugin classes</description>
+ </property>
+</configuration>
+
+
+6) Include the jars in the "FLUME_CLASSPATH"
+From the terminal where you would start the Flume nodes:
+ $ export FLUME_CLASSPATH=/your_path/flume/plugins/hbasesink/hbase_sink.jar:/your_path/hbase-*/conf/
+ Include all the jars that your plugin refers to
+
+Here's how I've run flume against hbase in dev mode:
+
+7) From your hbase-*, "bin/start-hbase.sh"
+
+Note that starting hbase will also start ZooKeeper
+
+8) using the hbase shell create a table for the flume sink to write events
+
+$ bin/hbase shell
+> create 't1', 'f1', 'f2'
+
+9) Undertand the attr2hbase sink usage and configure the source:
+ "usage: attr2hbase(\"table\" [,\"sysFamily\"[, \"writeBody\"[,\"attrPrefix\"[,\"writeBufferSize\"[,\"writeToWal\"]]]]])
+ Refer parameter_mapping.html in /docs for more details.
+
+10) start flume, I started a node with a rssatomSource and attr2hbase("t1","f1","f2:event","attr-prefix","10","false")'
+
+> scan 't1'
+(This is output of my rowkey and data)
+
+hbase(main):002:0> scan 't1'
+ROW COLUMN+CELL
+ \x00\x00\xF6_\x0Fk\xF4\x80 column=f2:event, timestamp=1274227444388, value=hello
+ \x00\x00\xF6_\x0Fk\xF4\x80 column=f1:host, timestamp=1274227444388, value=valhalla
+ \x00\x00\xF6_\x0Fk\xF4\x80 column=f1:timestamp, timestamp=1274227444388, value=\x00\x00\x01\x28\xAD\xDF\xCA\
+ x7C
+1 row(s) in 0.0550 seconds
View
72 plugins/hbasesink/docs/parameter_mapping.html
@@ -0,0 +1,72 @@
+<html><head>
+<meta http-equiv="content-type" content="text/html; charset=ISO-8859-1">
+
+
+</head><body><center>
+<h1> This table shows the parameter mapping of Flume event to HBase sink </h1>
+</center>
+<p>
+ Sink has the next parameters: attr2hbase("table" [,"family"[,
+"writeBody"[,"attrPrefix"[,"writeBufferSize"[,"writeToWal"]]]]]). <br>
+ "table" - HBase table name to perform output into. <br>
+ "sysFamily" - Column family's name which is used to store "system" data (event's timestamp, host, priority). <br>
+ In case this param is absent or ="" the sink doesn't write "system" data.<br>
+ "writeBody" - Indicates whether event's body should be written among other "system" data.<br>
+ Default is "false" which means it should NOT be written.<br>
+ In case this param is absent or ="" the sink doesn't write "body" data.<br>
+ In case this param has format
+"column-family:qualifier" the sink writes the body to the specified
+column-family:qualifier <br>
+ "attrPrefix" - Attributes with this prefix in key will be placed into HBase table. Default value: "2hb_".<br>
+ Attribute key should be in the following format:
+"&lt;attrPrefix&gt;&lt;columnFamily&gt;:&lt;qualifier&gt;",<br>
+ e.g. "2hb_user:name" means that its value will be placed into "user" column family with "name" qualifier.<br>
+ Attribute with key "&lt;attrPrefix&gt;" SHOULD contain row key for Put,<br>
+ otherwise (if no row can be extracted) the event is skipped and no records are written to the HBase table.<br>
+ Next table shows what gets written into HBase
+table depending on the attribute name and other settings (in format
+columnFamily:qualifier-&gt;value, "-" means nothing is written).
+ </p>
+ <blockquote><table border="1">
+ <tbody><tr>
+ <th>Event's attr ("name"-&gt;"value") and Event's body -&gt; (body -&gt; "Val") </th>
+ <th>attrPrefix="2hb_", sysFamily=null, writeBody="bodyfam:bodycol"</th>
+ <th>attrPrefix="2hb_", sysFamily="sysfam", writeBody="bodyfam:bodycol"</th>
+ <th>attrPrefix="2hb_", sysFamily="sysfam", writeBody=""</th>
+ <th>attrPrefix="2hb_", sysFamily=null, writeBody=null</th>
+ </tr>
+ <tr>
+ <td>"any"-&gt;"foo", body-&gt;"EventVal"</td>
+ <td>bodyfam:bodycol-&gt; EventVal</td>
+ <td>bodyfam:bodycol-&gt; EventVal</td>
+ <td>-</td>
+ <td>-</td>
+ </tr>
+ <tr>
+ <td>"colfam:col"-&gt;"foo", body-&gt;""</td>
+ <td>-</td>
+ <td>-</td>
+ <td>-</td>
+ <td>-</td>
+ </tr>
+ <tr>
+ <td>"2hb_any"-&gt;"foo" body-&gt;"EventVal"</td>
+ <td>bodyfam:bodycol-&gt; EventVal</td>
+ <td>sysfam:any-&gt;foo, bodyfam:bodycol-&gt; EventVal</td>
+ <td>sysfam:any-&gt;foo</td>
+ <td>-</td>
+ </tr>
+ <tr>
+ <td>"2hb_colfam:col"-&gt;"foo", body-&gt;""</td>
+ <td>colfam:col-&gt;foo</td>
+ <td>colfam:col-&gt;foo</td>
+ <td>colfam:col-&gt;foo</td>
+ <td>colfam:col-&gt;foo</td>
+ </tr>
+ </tbody></table></blockquote>
+
+Note: <br> 1) The attr2hbase sink with attribute-prefix parameter as
+"empty/null", will use the hard-coded default attribute-prefix:"2hb_"
+<br>
+2) The Events without values for the configured attribute-prefix or default "2hb_" will NOT be written to the HBase.
+</body></html>
View
BIN plugins/hbasesink/lib/hadoop-test-0.20.2-cdh3u0.jar
Binary file not shown.
View
BIN plugins/hbasesink/lib/hbase-0.90.1-cdh3u0-tests.jar
Binary file not shown.
View
BIN plugins/hbasesink/lib/hbase-0.90.1-cdh3u0.jar
Binary file not shown.
View
352 plugins/hbasesink/src/java/com/cloudera/flume/hbase/Attr2HBaseEventSink.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.flume.hbase;
+
+import com.cloudera.flume.conf.Context;
+import com.cloudera.flume.conf.SinkFactory;
+import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
+import com.cloudera.flume.core.Event;
+import com.cloudera.flume.core.EventSink;
+import com.cloudera.util.Pair;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+
+/**
+ * This generates an HBase output sink which puts event attributes into HBase
+ * record based on their names. It is similar to
+ * {@link com.cloudera.flume.handlers.hbase.HBaseEventSink}, please refer to
+ * README.txt for basic steps.
+
+ * Sink has the next parameters: attr2hbase("table" [,"family"[, "writeBody"[,"attrPrefix"[,"writeBufferSize"[,"writeToWal"]]]]]).
+ * "table" - HBase table name to perform output into.
+ * "sysFamily" - Column family's name which is used to store "system" data (event's timestamp, host, priority).
+ * In case this param is absent or ="" the sink doesn't write "system" data.
+ * "writeBody" - Indicates whether event's body should be written among other "system" data.
+ * Default is "false" which means it should NOT be written.
+ * In case this param is absent or ="" the sink doesn't write "body" data.
+ * In case this param has format "column-family:qualifier" the sink writes the body to the specified column-family:qualifier
+ * "attrPrefix" - Attributes with this prefix in key will be placed into HBase table. Default value: "2hb_".
+ * Attribute key should be in the following format: "&lt;attrPrefix&gt;&lt;columnFamily&gt;:&lt;qualifier&gt;",
+ * e.g. "2hb_user:name" means that its value will be placed into "user" column family with "name" qualifier.
+ * Attribute with key "&lt;attrPrefix&gt;" SHOULD contain row key for Put,
+ * otherwise (if no row can be extracted) the event is skipped and no records are written to the HBase table.
+ * Next table shows what gets written into HBase table depending on the attribute name and other settings (in format columnFamily:qualifier->value, "-" means nothing is written).
+ *
+ * <blockquote><table border=1>
+ * <tr>
+ * <th>Event's attr ("name"->"value") and Event's body -> (body -> "Val") </th>
+ * <th>attrPrefix="2hb_", sysFamily=null, writeBody="bodyfam:bodycol"</th>
+ * <th>attrPrefix="2hb_", sysFamily="sysfam", writeBody="bodyfam:bodycol"</th>
+ * <th>attrPrefix="2hb_", sysFamily="sysfam", writeBody=""</th>
+ * <th>attrPrefix="2hb_", sysFamily=null, writeBody=null</th>
+ * </tr>
+ * <tr>
+ * <td>"any"->"foo", body->"EventVal"</td>
+ * <td>bodyfam:bodycol-> EventVal</td>
+ * <td>bodyfam:bodycol-> EventVal</td>
+ * <td>-</td>
+ * <td>-</td>
+ * </tr>
+ * <tr>
+ * <td>"colfam:col"->"foo", body->""</td>
+ * <td>-</td>
+ * <td>-</td>
+ * <td>-</td>
+ * <td>-</td>
+ * </tr>
+ * <tr>
+ * <td>"2hb_any"->"foo" body->"EventVal"</td>
+ * <td>bodyfam:bodycol-> EventVal</td>
+ * <td>sysfam:any->foo, bodyfam:bodycol-> EventVal</td>
+ * <td>sysfam:any->foo</td>
+ * <td>-</td>
+ * </tr>
+ * <tr>
+ * <td>"2hb_colfam:col"->"foo", body->""</td>
+ * <td>colfam:col->foo</td>
+ * <td>colfam:col->foo</td>
+ * <td>colfam:col->foo</td>
+ * <td>colfam:col->foo</td>
+ * </tr>
+ * </table></blockquote>
+ *
+ * "writeBufferSize" - If provided, autoFlush for the HTable set to "false", and
+ * writeBufferSize is set to its value. If not provided, by default autoFlush is
+ * set to "true" (default HTable setting). This setting is valuable to boost
+ * HBase write speed.
+ *
+ * "writeToWal" - Determines whether WAL should be used
+ * during writing to HBase. If not provided Puts are written to WAL by default
+ * This setting is valuable to boost HBase write speed, but decreases
+ * reliability level. Use it if you know what it does.
+ *
+ * The Sink also implements method getSinkBuilders(), so it can be used as
+ * Flume's extension plugin (see flume.plugin.classes property of flume-site.xml
+ * config details)
+ */
+public class Attr2HBaseEventSink extends EventSink.Base {
+ private static final Logger LOG=LoggerFactory.getLogger(Attr2HBaseEventSink.class);
+ public static final String USAGE = "usage: attr2hbase(\"table\" [,\"sysFamily\"[, \"writeBody\"[,\"attrPrefix\"[,\"writeBufferSize\"[,\"writeToWal\"]]]]])";
+
+ private String tableName;
+
+ /**
+ * Column family name to store system data like timestamp of event, host
+ */
+ private byte[] systemFamilyName;
+ private String attrPrefix = "2hb_";
+ private long writeBufferSize = 0L;
+ private boolean writeToWal = true;
+ private boolean writeBody = true;
+ private String bodyCol;
+ private String bodyFam;
+
+ private Configuration config;
+ private HTable table;
+
+ /**
+ * Instantiates sink. See detailed explanation of parameters and their values
+ * at {@link com.cloudera.flume.handlers.hbase.Attr2HBaseEventSink}
+ *
+ * @param tableName
+ * HBase table name to output data into
+ * @param systemFamilyName
+ * name of columnFamily where to store event's system data
+ * @param writeBody
+ * Indicates whether event's body should be written to the specified column-family:qualifier
+ * @param bodyFam
+ * indicates the column-family for writing the body
+ * @param bodyCol
+ * indicates the column-qualifier for writing the body
+ * @param attrPrefix
+ * attributes with this prefix in key will be placed into HBase table
+ * @param writeBufferSize
+ * HTable's writeBufferSize
+ * @param writeToWal
+ * determines whether WAL should be used during writing to HBase
+ */
+ public Attr2HBaseEventSink(String tableName, String systemFamilyName,
+ boolean writeBody, String bodyFam, String bodyCol, String attrPrefix,
+ long writeBufferSize, boolean writeToWal) {
+ // You need a configuration object to tell the client where to connect.
+ // When you create a HBaseConfiguration, it reads in whatever you've set
+ // into your hbase-site.xml and in hbase-default.xml, as long as these can
+ // be found on the CLASSPATH
+ this(tableName, systemFamilyName, writeBody, bodyFam, bodyCol, attrPrefix,
+ writeBufferSize, writeToWal, HBaseConfiguration.create());
+ }
+
+ /**
+ * Instantiates sink. See detailed explanation of parameters and their values
+ * at {@link com.cloudera.flume.handlers.hbase.Attr2HBaseEventSink}
+ *
+ * @param tableName
+ * HBase table name to output data into
+ * @param systemFamilyName
+ * name of columnFamily where to store event's system data
+ * @param writeBody
+ * Indicates whether event's body should be written to the specified column-family:qualifier
+ * @param bodyFam
+ * indicates the column-family for writing the body
+ * @param bodyCol
+ * indicates the column-qualifier for writing the body
+ * @param attrPrefix
+ * attributes with this prefix in key will be placed into HBase table
+ * @param writeBufferSize
+ * HTable's writeBufferSize
+ * @param writeToWal
+ * determines whether WAL should be used during writing to HBase
+ * @param config
+ * HBase configuration
+ */
+ public Attr2HBaseEventSink(String tableName, String systemFamilyName,
+ boolean writeBody, String bodyFam, String bodyCol, String attrPrefix,
+ long writeBufferSize, boolean writeToWal, Configuration config) {
+ Preconditions.checkNotNull(tableName,
+ "HBase table's name MUST be provided.");
+ this.tableName = tableName;
+ // systemFamilyName can be null or empty String, which means
+ // "don't store "system" data
+ if (systemFamilyName != null && !"".equals(systemFamilyName)) {
+ this.systemFamilyName = Bytes.toBytes(systemFamilyName);
+ }
+ if (attrPrefix != null) {
+ this.attrPrefix = attrPrefix;
+ }
+
+ this.writeBody = writeBody;
+ this.bodyFam = bodyFam;
+ this.bodyCol = bodyCol;
+
+ this.writeBufferSize = writeBufferSize;
+ this.writeToWal = writeToWal;
+ this.config = config;
+ }
+
+ @Override
+ public void append(Event e) throws IOException {
+ Put p = createPut(e);
+
+ if (p != null && p.getFamilyMap().size() > 0) {
+ p.setWriteToWAL(writeToWal);
+ table.put(p);
+ }
+ }
+
+ // Made as package-private for unit-testing
+ Put createPut(Event e) {
+ Put p;
+ // Attribute with key "<attrPrefix>" contains row key for Put
+ if (e.getAttrs().containsKey(attrPrefix)) {
+ p = new Put(e.getAttrs().get(attrPrefix));
+ } else {
+ LOG.warn("Cannot extract key for HBase row, the attribute with key '"
+ + attrPrefix + "' is not present in event's data. No rows inserted.");
+ return null;
+ }
+
+ if (systemFamilyName != null) {
+ //TODO (dani) check if systemFamilyName exists in table-name
+ p.add(systemFamilyName, Bytes.toBytes("timestamp"), Bytes.toBytes(e.getTimestamp()));
+ p.add(systemFamilyName, Bytes.toBytes("host"), Bytes.toBytes(e.getHost()));
+ if (e.getPriority() != null) {
+ p.add(systemFamilyName, Bytes.toBytes("priority"), Bytes.toBytes(e.getPriority().toString()));
+ }
+
+ // Empty events are created with ""
+ if (writeBody && e.getBody().length != 0) {
+ //TODO (dani) check if bodyFam exists in table-name
+ Put re = p.add(Bytes.toBytes(bodyFam), Bytes.toBytes(bodyCol), e.getBody());
+ } else {
+ LOG.warn("Skipping Body");
+ }
+ }
+
+ for (Entry<String, byte[]> a : e.getAttrs().entrySet()) {
+ attemptToAddAttribute(p, a);
+ }
+ return p;
+ }
+
+ // Made as package-private for unit-testing
+ // Entry here represents event's attribute: key is attribute name and value is
+ // attribute value
+ void attemptToAddAttribute(Put p, Entry<String, byte[]> a) {
+ String attrKey = a.getKey();
+ if (attrKey.startsWith(attrPrefix)
+ && attrKey.length() > attrPrefix.length()) {
+ String keyWithoutPrefix = attrKey.substring(attrPrefix.length());
+ // please see the javadoc of attrPrefix format for more info
+ String[] col = keyWithoutPrefix.split(":", 2);
+ // if both columnFamily and qualifier can be fetched from attribute's key
+ boolean hasColumnFamilyAndQualifier = col.length == 2
+ && col[0].length() > 0 && col[1].length() > 0;
+ if (hasColumnFamilyAndQualifier) {
+ p.add(Bytes.toBytes(col[0]), Bytes.toBytes(col[1]), a.getValue());
+ return;
+ } else if (systemFamilyName != null) {
+ p.add(systemFamilyName, Bytes.toBytes(keyWithoutPrefix), a.getValue());
+ return;
+ } else {
+ LOG.warn("Cannot determine column family and/or qualifier for attribute, attribute name: "
+ + attrKey);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (table != null) {
+ table.close(); // performs flushCommits() internally, so we are good when
+ // autoFlush=false
+ table = null;
+ }
+ }
+
+ @Override
+ public void open() throws IOException {
+ if (table != null) {
+ throw new IllegalStateException(
+ "HTable is already initialized. Looks like sink close() hasn't been proceeded properly.");
+ }
+ // This instantiates an HTable object that connects you to
+ // the tableName table.
+ table = new HTable(config, tableName);
+ if (writeBufferSize > 0) {
+ table.setAutoFlush(false);
+ table.setWriteBufferSize(writeBufferSize);
+ }
+ }
+
+ public static SinkBuilder builder() {
+ return new SinkBuilder() {
+
+ @Override
+ public EventSink build(Context context, String... argv) {
+ Preconditions.checkArgument(argv.length >= 1, USAGE);
+
+ // TODO: check that arguments has proper types
+ String tableName = argv[0];
+ String systemFamilyName = argv.length >= 2 ? argv[1] : null;
+ // Body to be written in a column-name specified as source parameter, if
+ // parameter is "" it translates to writeBody as False.
+ // Default writeBody is false
+ boolean writeBody = false;
+ if(argv.length >= 3 && argv[2].length() >= 1)
+ {
+ writeBody = true;
+ }
+ String bodyFam = null;
+ String bodyCol = null;
+ if (writeBody) {
+ String bodyParams[] = argv[2].split(":");
+ if (bodyParams.length != 2) {
+ // TODO (dani) make this message easier.
+ throw new IllegalArgumentException(
+ "Malformed writeBody param, usage: bodyColumnFamily:bodyColumnQualifier");
+ }
+ bodyFam = bodyParams[0];
+ bodyCol = bodyParams[1];
+ }
+ String attrPrefix = argv.length >= 4 ? argv[3] : null;
+ long bufferSize = argv.length >= 5 ? Long.valueOf(argv[4]) : 0;
+ // TODO: add more sophisticated boolean conversion
+ boolean writeToWal = argv.length >= 6 ? Boolean.valueOf(argv[5]
+ .toLowerCase()) : true;
+ return new Attr2HBaseEventSink(tableName, systemFamilyName, writeBody,
+ bodyFam, bodyCol, attrPrefix, bufferSize, writeToWal);
+ }
+ };
+ }
+
+ public static List<Pair<String, SinkFactory.SinkBuilder>> getSinkBuilders() {
+ return Arrays.asList(new Pair<String, SinkFactory.SinkBuilder>(
+ "attr2hbase", builder()));
+ }
+}
View
36 plugins/hbasesink/src/javatest/com/cloudera/flume/hbase/HBaseTestEnv.java
@@ -0,0 +1,36 @@
+// This Cloudera, Inc. source code, including without limit any
+// human-readable computer programming code and associated documentation
+// (together "Source Code"), contains valuable confidential, proprietary
+// and trade secret information of Cloudera and is protected by the laws
+// of the U.S. and other countries. Any use of the Source Code, including
+// without limit any disclosure or reproduction, without the prior
+// written authorization of Cloudera is strictly prohibited.
+//
+// Copyright (c) 2010 Cloudera, Inc. All rights reserved.
+package com.cloudera.flume.hbase;
+
+import java.io.File;
+
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+
+import com.cloudera.util.FileUtil;
+
+public class HBaseTestEnv extends HBaseClusterTestCase {
+ private File hbaseTestDir;
+
+ @Override
+ public String getName() {
+ // TODO replace with actual test name
+ return "HBaseTestEnv";
+ }
+
+ public void setUp() throws Exception {
+ hbaseTestDir = FileUtil.mktempdir();
+
+ super.setUp();
+ }
+ public void tearDown() throws Exception {
+ super.tearDown();
+ FileUtil.rmr(hbaseTestDir);
+ }
+}
View
313 plugins/hbasesink/src/javatest/com/cloudera/flume/hbase/TestAttr2HBaseSink.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.flume.hbase;
+
+import com.cloudera.flume.core.Event;
+import com.cloudera.flume.core.Event.Priority;
+import com.cloudera.flume.core.EventImpl;
+import com.cloudera.flume.core.EventSink;
+import com.cloudera.util.Clock;
+import junit.framework.Assert;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+
+/**
+ * Test the hbase sink writes events to a table/family properly
+ */
+public class TestAttr2HBaseSink {
+ private static HBaseTestEnv hbaseEnv;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ // expensive, so just do it once for all tests, just make sure
+ // that tests don't overlap (use diff tables for each test)
+ hbaseEnv = new HBaseTestEnv();
+ hbaseEnv.conf.set(HBaseTestCase.TEST_DIRECTORY_KEY, "build/test/data");
+ hbaseEnv.setUp();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ hbaseEnv.tearDown();
+ }
+
+ /**
+ * Write events to a sink directly, verify by scanning HBase table.
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testSink() throws IOException, InterruptedException {
+ final String tableName = "testSink";
+ final String tableSysFamily = "sysFamily";
+ final String tableFamily1 = "family1";
+ final String tableFamily2 = "family2";
+ final String tableBody = "sysFamily";
+ final String tableBodyFamily = "event";
+ final Boolean writeBody = true;
+
+ // create the table and column family to be used by sink
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor(tableSysFamily));
+ desc.addFamily(new HColumnDescriptor(tableFamily1));
+ desc.addFamily(new HColumnDescriptor(tableFamily2));
+ HBaseAdmin admin = new HBaseAdmin(hbaseEnv.conf);
+ admin.createTable(desc);
+
+ // explicit constructor rather than builder - we want to control the conf
+ EventSink snk = new Attr2HBaseEventSink(tableName, tableSysFamily,
+ writeBody, tableBody, tableBodyFamily, "2hb_", 0, true, hbaseEnv.conf);
+ snk.open();
+ try {
+ Event e1 = new EventImpl("message0".getBytes(), Clock.unixTime(),
+ Priority.INFO, 0, "localhost");
+ e1.set("2hb_", Bytes.toBytes("row-key0"));
+ e1.set("2hb_family1:column1", Bytes.toBytes("value0"));
+ e1.set("2hb_family2:column2", Bytes.toBytes("value_0"));
+ e1.set("other", Bytes.toBytes("val0"));
+ Event e2 = new EventImpl("message1".getBytes(), Clock.unixTime(),
+ Priority.INFO, 1, "localhost");
+ e2.set("2hb_", Bytes.toBytes("row-key1"));
+ e2.set("2hb_family1:column1", Bytes.toBytes("value1"));
+ e2.set("2hb_family2:column2", Bytes.toBytes("value_1"));
+ e2.set("other", Bytes.toBytes("val1"));
+ Event e3 = new EventImpl("message2".getBytes(), Clock.unixTime(),
+ Priority.INFO, 2, "localhost");
+ e3.set("2hb_", Bytes.toBytes("row-key2"));
+ e3.set("2hb_family1:column1", Bytes.toBytes("value2"));
+ e3.set("2hb_family2:column2", Bytes.toBytes("value_2"));
+ e3.set("other", Bytes.toBytes("val2"));
+ snk.append(e1);
+ snk.append(e2);
+ snk.append(e3);
+ } finally {
+ snk.close();
+ }
+
+ // verify that the events made it into hbase
+ HTable table = new HTable(hbaseEnv.conf, tableName);
+ try {
+ for (long i = 0; i <= 2; i++) {
+ Result r = table.get(new Get(Bytes.toBytes("row-key" + i)));
+ System.out.println("result " + r);
+
+ byte[] host = r.getValue(Bytes.toBytes(tableSysFamily),
+ Bytes.toBytes("host"));
+ Assert.assertEquals("Matching host", "localhost", Bytes.toString(host));
+ byte[] priority = r.getValue(Bytes.toBytes(tableSysFamily),
+ Bytes.toBytes("priority"));
+ Assert.assertEquals("Matching priority", "INFO",
+ Bytes.toString(priority));
+ byte[] body = r.getValue(Bytes.toBytes(tableSysFamily),
+ Bytes.toBytes("event"));
+ Assert.assertEquals("Matching body", "message" + i,
+ Bytes.toString(body));
+ // 4 here means: host, timestamp, priority and body
+ Assert.assertEquals("Matching values added", 4,
+ r.getFamilyMap(Bytes.toBytes(tableSysFamily)).size());
+
+ byte[] fam1value = r.getValue(Bytes.toBytes(tableFamily1),
+ Bytes.toBytes("column1"));
+ Assert.assertEquals("Matching value", "value" + i,
+ Bytes.toString(fam1value));
+ Assert.assertEquals("Matching values added", 1,
+ r.getFamilyMap(Bytes.toBytes(tableFamily1)).size());
+
+ byte[] fam2value = r.getValue(Bytes.toBytes(tableFamily2),
+ Bytes.toBytes("column2"));
+ Assert.assertEquals("Matching value", "value_" + i,
+ Bytes.toString(fam2value));
+ Assert.assertEquals("Matching values added", 1,
+ r.getFamilyMap(Bytes.toBytes(tableFamily2)).size());
+
+ }
+ } finally {
+ table.close();
+ }
+ }
+
+ @Test
+ public void testCreatePutWithoutSystemColumnFamily() {
+ final String tableBody = "sysFamily";
+ final String tableBodyFamily = "event";
+ final Boolean writeBody = true;
+ Attr2HBaseEventSink snk = new Attr2HBaseEventSink("tableName", "",
+ writeBody, tableBody, tableBodyFamily, "2hb_", 0, true, null);
+
+ Event e = new EventImpl("message".getBytes(), Clock.unixTime(),
+ Priority.INFO, 0, "localhost");
+ e.set("2hb_", Bytes.toBytes("rowKey"));
+ e.set("2hb_family1:column1", Bytes.toBytes("value1"));
+ e.set("2hb_family2:column2", Bytes.toBytes("value2"));
+ e.set("other", Bytes.toBytes("val"));
+
+ Put put = snk.createPut(e);
+ Assert.assertEquals("Matching row key", "rowKey",
+ Bytes.toString(put.getRow()));
+ Assert.assertEquals("Matching column families added", 2, put.getFamilyMap()
+ .size());
+ Assert.assertEquals(
+ "Matching value",
+ "value1",
+ Bytes.toString(put
+ .get(Bytes.toBytes("family1"), Bytes.toBytes("column1")).get(0)
+ .getValue()));
+ Assert.assertEquals(
+ "Matching value",
+ "value2",
+ Bytes.toString(put
+ .get(Bytes.toBytes("family2"), Bytes.toBytes("column2")).get(0)
+ .getValue()));
+ }
+
+ @Test
+ public void testDontWriteBody() {
+ final String tableBody = "";
+ final String tableBodyFamily = "";
+ final Boolean writeBody = false;
+ Attr2HBaseEventSink snk = new Attr2HBaseEventSink("tableName", "sysFam",
+ writeBody, tableBody, tableBodyFamily, "2hb_", 0, true, null);
+
+ Event e = new EventImpl("message".getBytes(), Clock.unixTime(),
+ Priority.INFO, 0, "localhost");
+ e.set("2hb_", Bytes.toBytes("rowKey"));
+
+ Put put = snk.createPut(e);
+ // 3 here means: host, timestamp, priority (body is excluded)
+ Assert.assertEquals("Matching values added", 3,
+ put.getFamilyMap().get(Bytes.toBytes("sysFam")).size());
+ Assert.assertEquals("Body shouldn't be written", 0,
+ put.get(Bytes.toBytes("sysFam"), Bytes.toBytes("event")).size());
+ }
+
+ @Test
+ public void testCreatePutWithoutExplicitRowKey() {
+ final String tableBody = "sysFamily";
+ final String tableBodyFamily = "event";
+ final Boolean writeBody = true;
+ Attr2HBaseEventSink snk = new Attr2HBaseEventSink("tableName", "sysFam",
+ writeBody, tableBody, tableBodyFamily, "2hb_", 0, true, null);
+
+ Event e = new EventImpl("message".getBytes(), Clock.unixTime(),
+ Priority.INFO, 0, "localhost");
+ e.set("other", Bytes.toBytes("val0"));
+
+ Put put = snk.createPut(e);
+ Assert.assertNull("No put should be created when row key data is absent",
+ put);
+ }
+
+ @Test
+ public void testAddAttributeWithSystemColumnFamSpecified() {
+ final String tableBody = "sysFamily";
+ final String tableBodyFamily = "event";
+ final Boolean writeBody = true;
+ Attr2HBaseEventSink snk = new Attr2HBaseEventSink("tableName", "sysFam",
+ writeBody, tableBody, tableBodyFamily, "2hb_", 0, true, null);
+ byte[] foo = Bytes.toBytes("foo");
+
+ Put put = new Put(foo);
+ snk.attemptToAddAttribute(put, createAttr(":any:", foo));
+ // since attribute has no needed prefix, it shouldn't be added
+ assertEmpty(put);
+
+ put = new Put(foo);
+ snk.attemptToAddAttribute(put, createAttr("2hb_:any:", foo));
+ // since attribute has prefix, but isn't contain colFam *and* qualifier, it
+ // should be put into "system" colFam
+ assertHasSingleKeyValue(put, "sysFam", ":any:", foo);
+
+ put = new Put(foo);
+ snk.attemptToAddAttribute(put, createAttr("2hb_columnFam:columnName", foo));
+ assertHasSingleKeyValue(put, "columnFam", "columnName", foo);
+ }
+
+ @Test
+ public void testAddAttributeWithoutSystemColumnFam() {
+ final String tableBody = "sysFamily";
+ final String tableBodyFamily = "event";
+ final Boolean writeBody = true;
+ Attr2HBaseEventSink snk = new Attr2HBaseEventSink("tableName", "",
+ writeBody, tableBody, tableBodyFamily, "2hb_", 0, true, null);
+ byte[] foo = Bytes.toBytes("foo");
+
+ Put put = new Put(foo);
+ // since attribute has no needed prefix, it shouldn't be added
+ snk.attemptToAddAttribute(put, createAttr(":any:", foo));
+ assertEmpty(put);
+
+ put = new Put(foo);
+ // since attribute has prefix, but isn't contain colFam *and* qualifier and
+ // "system" colFam isn't specified it should be omitted
+ snk.attemptToAddAttribute(put, createAttr("2hb_:any:", foo));
+ assertEmpty(put);
+
+ put = new Put(foo);
+ snk.attemptToAddAttribute(put, createAttr("2hb_columnFam:columnName", foo));
+ assertHasSingleKeyValue(put, "columnFam", "columnName", foo);
+ }
+
+ private void assertHasSingleKeyValue(Put put, String fam, String col,
+ byte[] val) {
+ Assert.assertEquals("Matching added values", 1, put.getFamilyMap().size());
+ Assert.assertTrue(
+ "Matching value",
+ Bytes.compareTo(val, put.get(Bytes.toBytes(fam), Bytes.toBytes(col))
+ .get(0).getValue()) == 0);
+ }
+
+ private void assertEmpty(Put put) {
+ Assert.assertEquals("Matching added values", 0, put.getFamilyMap().size());
+ }
+
+ private AbstractMap.SimpleEntry<String, byte[]> createAttr(String key,
+ byte[] val) {
+ return new AbstractMap.SimpleEntry<String, byte[]>(key, val);
+ }
+
+ @Test
+ public void testAddAttributeWithSystemColumnFamSpecifiedAndEmptyPrefix() {
+ final String tableBody = "sysFamily";
+ final String tableBodyFamily = "event";
+ final Boolean writeBody = true;
+ Attr2HBaseEventSink snk = new Attr2HBaseEventSink("tableName", "sysFam",
+ writeBody, tableBody, tableBodyFamily, "", 0, true, null);
+ byte[] foo = Bytes.toBytes("foo");
+
+ Put put = new Put(foo);
+ snk.attemptToAddAttribute(put, createAttr(":any:", foo));
+ assertHasSingleKeyValue(put, "sysFam", ":any:", foo);
+
+ put = new Put(foo);
+ snk.attemptToAddAttribute(put, createAttr("columnFam:columnName", foo));
+ assertHasSingleKeyValue(put, "columnFam", "columnName", foo);
+ }
+}

0 comments on commit b68128d

Please sign in to comment.
Something went wrong with that request. Please try again.