Skip to content
Browse files

CDH-4551: Get TestHBaseSink working

  • Loading branch information...
1 parent 98afcc4 commit 9b270853a3837b05929240e61ced74f3258aaa09 @harishreedharan harishreedharan committed
Showing with 46 additions and 58 deletions.
  1. +45 −57 plugins/flume-plugin-hbasesink/src/test/java/com/cloudera/flume/hbase/TestHBaseSink.java
  2. +1 −1 pom.xml
View
102 plugins/flume-plugin-hbasesink/src/test/java/com/cloudera/flume/hbase/TestHBaseSink.java
@@ -23,7 +23,6 @@
import junit.framework.Assert;
-import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -49,22 +48,22 @@
* Test the hbase sink writes events to a table/family properly
*/
public class TestHBaseSink {
- private static HBaseTestEnv hbaseEnv;
public static Logger LOG = LoggerFactory.getLogger(TestHBaseSink.class);
public static final String DEFAULT_HOST = "qwigibo";
+ private static HBaseTestingUtility testingUtility = new HBaseTestingUtility();
+
@BeforeClass
public static void setup() throws Exception {
// expensive, so just do it once for all tests, just make sure
// that tests don't overlap (use diff tables for each test)
- hbaseEnv = new HBaseTestEnv();
- hbaseEnv.conf.set(HBaseTestingUtility.BASE_TEST_DIRECTORY_KEY, "build/test/data");
- hbaseEnv.setUp();
+ testingUtility.startMiniCluster();
+ testingUtility.getConfiguration().set(HBaseTestingUtility.BASE_TEST_DIRECTORY_KEY, "build/test/data");
}
@AfterClass
public static void teardown() throws Exception {
- hbaseEnv.tearDown();
+ testingUtility.shutdownMiniCluster();
}
void shipThreeEvents(HBaseSink snk) throws IOException {
@@ -101,32 +100,27 @@ void shipThreeEvents(HBaseSink snk) throws IOException {
/**
* Write events to a sink directly, verify by scanning HBase table. x
*/
- // @Test
+ @Test
public void testSink() throws IOException, InterruptedException {
final String tableName = "testSinkTab";
final String tableFamily1 = "family1";
final String tableFamily2 = "family2";
-
- HBaseAdmin admin = hbaseEnv.getHBaseAdmin();
-
- // create the table and column family to be used by sink
- HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addFamily(new HColumnDescriptor(tableFamily1));
- desc.addFamily(new HColumnDescriptor(tableFamily2));
- // HBaseAdmin admin = new HBaseAdmin(hbaseEnv.conf);
- admin.createTable(desc);
- admin.flush(tableName);
-
+ System.out.println("Starting.. testSinkTab:");
+ byte[][] args = new byte[2][];
+ args[0]=tableFamily1.getBytes();
+ args[1]=tableFamily2.getBytes();
+ testingUtility.createTable(tableName.getBytes(),args);
+ System.out.println("Created table");
// explicit constructor rather than builder - we want to control the conf
List<QualifierSpec> spec = new ArrayList<QualifierSpec>();
spec.add(new QualifierSpec(tableFamily1, "col1", "%{attr1}"));
spec.add(new QualifierSpec(tableFamily2, "col2", "%{attr2}"));
HBaseSink snk = new HBaseSink(tableName, "%{rowkey}", spec, 0L, false,
- hbaseEnv.conf);
+ testingUtility.getConfiguration());
shipThreeEvents(snk);
// verify that the events made it into hbase
- HTable table = new HTable(hbaseEnv.conf, tableName);
+ HTable table = new HTable(testingUtility.getConfiguration(), tableName);
try {
for (long i = 0; i <= 2; i++) {
Result r = table.get(new Get(Bytes.toBytes("row-key" + i)));
@@ -143,37 +137,36 @@ public void testSink() throws IOException, InterruptedException {
}
} finally {
table.close();
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
+ testingUtility.getHBaseAdmin().disableTable(tableName);
+ testingUtility.getHBaseAdmin().deleteTable(tableName);
}
}
/**
* Write events to a sink directly, verify by scanning HBase table. x
*/
- // @Test
+ @Test
public void testSinkEmptyCol() throws IOException, InterruptedException {
final String tableName = "testSinkEmptyColTab";
final String tableFamily1 = "family1";
final String tableFamily2 = "family2";
- // create the table and column family to be used by sink
- HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addFamily(new HColumnDescriptor(tableFamily1));
- desc.addFamily(new HColumnDescriptor(tableFamily2));
- HBaseAdmin admin = new HBaseAdmin(hbaseEnv.conf);
- admin.createTable(desc);
-
+ System.out.println("Starting.. testSinkTab:");
+ byte[][] args = new byte[2][];
+ args[0]=tableFamily1.getBytes();
+ args[1]=tableFamily2.getBytes();
+ testingUtility.createTable(tableName.getBytes(),args);
+ System.out.println("Created table");
// explicit constructor rather than builder - we want to control the conf
List<QualifierSpec> spec = new ArrayList<QualifierSpec>();
spec.add(new QualifierSpec(tableFamily1, "", "%{attr1}"));
spec.add(new QualifierSpec(tableFamily2, "", "%{attr2}"));
HBaseSink snk = new HBaseSink(tableName, "%{rowkey}", spec, 0L, false,
- hbaseEnv.conf);
+ testingUtility.getConfiguration());
shipThreeEvents(snk);
// verify that the events made it into hbase
- HTable table = new HTable(hbaseEnv.conf, tableName);
+ HTable table = new HTable(testingUtility.getConfiguration(), tableName);
try {
for (long i = 0; i <= 2; i++) {
Result r = table.get(new Get(Bytes.toBytes("row-key" + i)));
@@ -188,37 +181,36 @@ public void testSinkEmptyCol() throws IOException, InterruptedException {
}
} finally {
table.close();
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
+ testingUtility.getHBaseAdmin().disableTable(tableName);
+ testingUtility.getHBaseAdmin().deleteTable(tableName);
}
}
/**
* Write events to a sink directly, verify by scanning HBase table. x
*/
- // @Test
+ @Test
public void testSinkEscaping() throws IOException, InterruptedException {
final String tableName = "testSinkEscapingTab";
final String tableFamily1 = "family1";
final String tableFamily2 = "family2";
- // create the table and column family to be used by sink
- HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addFamily(new HColumnDescriptor(tableFamily1));
- desc.addFamily(new HColumnDescriptor(tableFamily2));
- HBaseAdmin admin = new HBaseAdmin(hbaseEnv.conf);
- admin.createTable(desc);
-
+ System.out.println("Starting.. testSinkTab:");
+ byte[][] args = new byte[2][];
+ args[0]=tableFamily1.getBytes();
+ args[1]=tableFamily2.getBytes();
+ testingUtility.createTable(tableName.getBytes(),args);
+ System.out.println("Created table");
// explicit constructor rather than builder - we want to control the conf
List<QualifierSpec> spec = new ArrayList<QualifierSpec>();
spec.add(new QualifierSpec(tableFamily1, "%{priority}", "%{body}"));
spec.add(new QualifierSpec(tableFamily2, "col2", "%{badescape}"));
HBaseSink snk = new HBaseSink(tableName, "%{host}-%{rowkey}", spec, 0L,
- false, hbaseEnv.conf);
+ false, testingUtility.getConfiguration());
shipThreeEvents(snk);
// verify that the events made it into hbase
- HTable table = new HTable(hbaseEnv.conf, tableName);
+ HTable table = new HTable(testingUtility.getConfiguration(), tableName);
try {
for (long i = 0; i <= 2; i++) {
Result r = table.get(new Get(Bytes.toBytes(DEFAULT_HOST + "-row-key"
@@ -235,12 +227,12 @@ public void testSinkEscaping() throws IOException, InterruptedException {
}
} finally {
table.close();
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
+ testingUtility.getHBaseAdmin().disableTable(tableName);
+ testingUtility.getHBaseAdmin().deleteTable(tableName);
}
}
- // @Test(expected = TableNotFoundException.class)
+ @Test(expected = TableNotFoundException.class)
public void testOpenFailBadTable() throws IOException {
final String tableFamily1 = "family1";
final String tableFamily2 = "family2";
@@ -250,7 +242,7 @@ public void testOpenFailBadTable() throws IOException {
spec.add(new QualifierSpec(tableFamily1, "col1", "%{attr1}"));
spec.add(new QualifierSpec(tableFamily2, "col2", "%{attr2}")); // invalid
HBaseSink snk = new HBaseSink("bogus table name", "%{rowkey}", spec, 0L,
- false, hbaseEnv.conf);
+ false, testingUtility.getConfiguration());
shipThreeEvents(snk);
}
@@ -260,21 +252,17 @@ public void testOpenFailBadColFam() throws IOException {
final String tableFamily1 = "family1";
final String tableFamily2 = "family2";
- // create the table and column family to be used by sink
- HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addFamily(new HColumnDescriptor(tableFamily1));
- HBaseAdmin admin = new HBaseAdmin(hbaseEnv.conf);
- admin.createTable(desc);
-
+ testingUtility.createTable(tableName.getBytes(),tableFamily1.getBytes());
+ System.out.println("Created table");
// explicit constructor rather than builder - we want to control the conf
List<QualifierSpec> spec = new ArrayList<QualifierSpec>();
spec.add(new QualifierSpec(tableFamily1, "col1", "%{attr1}"));
spec.add(new QualifierSpec(tableFamily2, "col2", "%{attr2}")); // invalid
HBaseSink snk = new HBaseSink(tableName, "%{rowkey}", spec, 0L, false,
- hbaseEnv.conf);
+ testingUtility.getConfiguration());
shipThreeEvents(snk);
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
+ testingUtility.getHBaseAdmin().disableTable(tableName);
+ testingUtility.getHBaseAdmin().deleteTable(tableName);
}
}
View
2 pom.xml
@@ -556,7 +556,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>r07</version>
+ <version>11.0.2</version>
</dependency>
<dependency>

0 comments on commit 9b27085

Please sign in to comment.
Something went wrong with that request. Please try again.