From ad5f286eafd0912e9db0b2c3d92d1dc408377b12 Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Sat, 1 Mar 2014 07:27:10 -0800 Subject: [PATCH] FLUME-2335: TestHBaseSink#testWithoutConfigurationObject() must delete the table at the end of the test (Hari Shreedharan via Jarek Jarcec Cecho) --- .../flume/sink/hbase/TestHBaseSink.java | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java index cb7c6ea201..20b7fe5c93 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java @@ -36,6 +36,7 @@ import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; @@ -59,6 +60,7 @@ public class TestHBaseSink { private static String plCol = "pCol"; private static Context ctx = new Context(); private static String valBase = "testing hbase sink: jham"; + private static Configuration conf; @BeforeClass public static void setUp() throws Exception { @@ -71,6 +73,7 @@ public static void setUp() throws Exception { ctxMap.put("serializer.payloadColumn", plCol); ctxMap.put("serializer.incrementColumn", inColumn); ctx.putAll(ctxMap); + conf = new Configuration(testUtility.getConfiguration()); } @AfterClass @@ -92,7 +95,7 @@ public void testOneEventWithDefaults() throws Exception { tmpctx.putAll(ctxMap); testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); Configurables.configure(channel, new Context()); @@ -108,7 +111,7 @@ public void testOneEventWithDefaults() throws Exception { sink.process(); sink.stop(); - HTable table = new HTable(testUtility.getConfiguration(), tableName); + HTable table = new HTable(conf, tableName); byte[][] results = getResults(table, 1); byte[] out = results[0]; Assert.assertArrayEquals(e.getBody(), out); @@ -120,7 +123,7 @@ public void testOneEventWithDefaults() throws Exception { @Test public void testOneEvent() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); Configurables.configure(channel, new Context()); @@ -136,7 +139,7 @@ public void testOneEvent() throws Exception { sink.process(); sink.stop(); - HTable table = new HTable(testUtility.getConfiguration(), tableName); + HTable table = new HTable(conf, tableName); byte[][] results = getResults(table, 1); byte[] out = results[0]; Assert.assertArrayEquals(e.getBody(), out); @@ -149,7 +152,7 @@ public void testOneEvent() throws Exception { public void testThreeEvents() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); ctx.put("batchSize", "3"); - HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); Configurables.configure(channel, new Context()); @@ -165,7 +168,7 @@ public void testThreeEvents() throws Exception { tx.close(); sink.process(); sink.stop(); - HTable table = new HTable(testUtility.getConfiguration(), tableName); + HTable table = new HTable(conf, tableName); byte[][] results = getResults(table, 3); byte[] out; int found = 0; @@ -187,7 +190,7 @@ public void testThreeEvents() throws Exception { public void testMultipleBatches() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); ctx.put("batchSize", "2"); - HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); //Reset the context to a higher batchSize ctx.put("batchSize", "100"); @@ -209,7 +212,7 @@ public void testMultipleBatches() throws Exception { } sink.stop(); Assert.assertEquals(2, count); - HTable table = new HTable(testUtility.getConfiguration(), tableName); + HTable table = new HTable(conf, tableName); byte[][] results = getResults(table, 3); byte[] out; int found = 0; @@ -230,7 +233,7 @@ public void testMultipleBatches() throws Exception { @Test(expected = FlumeException.class) public void testMissingTable() throws Exception { ctx.put("batchSize", "2"); - HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); //Reset the context to a higher batchSize ctx.put("batchSize", "100"); @@ -247,7 +250,7 @@ public void testMissingTable() throws Exception { tx.commit(); tx.close(); sink.process(); - HTable table = new HTable(testUtility.getConfiguration(), tableName); + HTable table = new HTable(conf, tableName); byte[][] results = getResults(table, 2); byte[] out; int found = 0; @@ -279,7 +282,7 @@ public void testMissingTable() throws Exception { public void testHBaseFailure() throws Exception { ctx.put("batchSize", "2"); testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); //Reset the context to a higher batchSize ctx.put("batchSize", "100"); @@ -296,7 +299,7 @@ public void testHBaseFailure() throws Exception { tx.commit(); tx.close(); sink.process(); - HTable table = new HTable(testUtility.getConfiguration(), tableName); + HTable table = new HTable(conf, tableName); byte[][] results = getResults(table, 2); byte[] out; int found = 0; @@ -373,7 +376,7 @@ private byte[][] getResults(HTable table, int numEvents) throws IOException{ public void testTransactionStateOnChannelException() throws Exception { ctx.put("batchSize", "1"); testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); // Reset the context to a higher batchSize Channel channel = spy(new MemoryChannel()); @@ -396,7 +399,7 @@ public void testTransactionStateOnChannelException() throws Exception { doReturn(e).when(channel).take(); sink.process(); sink.stop(); - HTable table = new HTable(testUtility.getConfiguration(), tableName); + HTable table = new HTable(conf, tableName); byte[][] results = getResults(table, 1); byte[] out = results[0]; Assert.assertArrayEquals(e.getBody(), out); @@ -411,7 +414,7 @@ public void testTransactionStateOnSerializationException() throws Exception { ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER, "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer"); testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); // Reset the context to a higher batchSize Channel channel = new MemoryChannel(); @@ -434,7 +437,7 @@ public void testTransactionStateOnSerializationException() throws Exception { MockSimpleHbaseEventSerializer.throwException = false; sink.process(); sink.stop(); - HTable table = new HTable(testUtility.getConfiguration(), tableName); + HTable table = new HTable(conf, tableName); byte[][] results = getResults(table, 1); byte[] out = results[0]; Assert.assertArrayEquals(e.getBody(), out); @@ -447,10 +450,10 @@ public void testTransactionStateOnSerializationException() throws Exception { public void testWithoutConfigurationObject() throws Exception{ ctx.put("batchSize", "2"); ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, - ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()) ); + ZKConfig.getZKQuorumServersString(conf) ); System.out.print(ctx.getString(HBaseSinkConfigurationConstants.ZK_QUORUM)); ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(); Configurables.configure(sink, ctx); @@ -475,7 +478,7 @@ public void testWithoutConfigurationObject() throws Exception{ status = sink.process(); } sink.stop(); - HTable table = new HTable(testUtility.getConfiguration(), tableName); + HTable table = new HTable(conf, tableName); byte[][] results = getResults(table, 3); byte[] out; int found = 0; @@ -490,6 +493,7 @@ public void testWithoutConfigurationObject() throws Exception{ Assert.assertEquals(3, found); out = results[3]; Assert.assertArrayEquals(Longs.toByteArray(3), out); + testUtility.deleteTable(tableName.getBytes()); } @Test @@ -499,7 +503,7 @@ public void testZKQuorum() throws Exception{ ctx.put("batchSize", "2"); ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum); ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); HBaseSink sink = new HBaseSink(); Configurables.configure(sink, ctx); Assert.assertEquals("zk1.flume.apache.org,zk2.flume.apache.org," + @@ -516,7 +520,7 @@ public void testZKQuorumIncorrectPorts() throws Exception{ ctx.put("batchSize", "2"); ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum); ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); HBaseSink sink = new HBaseSink(); Configurables.configure(sink, ctx); Assert.fail();