Skip to content

Commit

Permalink
FLUME-2335: TestHBaseSink#testWithoutConfigurationObject() must delet…
Browse files Browse the repository at this point in the history
…e the table at the end of the test

(Hari Shreedharan via Jarek Jarcec Cecho)
  • Loading branch information
Jarek Jarcec Cecho committed Mar 1, 2014
1 parent 96f6b62 commit ad5f286
Showing 1 changed file with 25 additions and 21 deletions.
Expand Up @@ -36,6 +36,7 @@
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
Expand All @@ -59,6 +60,7 @@ public class TestHBaseSink {
private static String plCol = "pCol";
private static Context ctx = new Context();
private static String valBase = "testing hbase sink: jham";
private static Configuration conf;

@BeforeClass
public static void setUp() throws Exception {
Expand All @@ -71,6 +73,7 @@ public static void setUp() throws Exception {
ctxMap.put("serializer.payloadColumn", plCol);
ctxMap.put("serializer.incrementColumn", inColumn);
ctx.putAll(ctxMap);
conf = new Configuration(testUtility.getConfiguration());
}

@AfterClass
Expand All @@ -92,7 +95,7 @@ public void testOneEventWithDefaults() throws Exception {
tmpctx.putAll(ctxMap);

testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
Channel channel = new MemoryChannel();
Configurables.configure(channel, new Context());
Expand All @@ -108,7 +111,7 @@ public void testOneEventWithDefaults() throws Exception {

sink.process();
sink.stop();
HTable table = new HTable(testUtility.getConfiguration(), tableName);
HTable table = new HTable(conf, tableName);
byte[][] results = getResults(table, 1);
byte[] out = results[0];
Assert.assertArrayEquals(e.getBody(), out);
Expand All @@ -120,7 +123,7 @@ public void testOneEventWithDefaults() throws Exception {
@Test
public void testOneEvent() throws Exception {
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
Channel channel = new MemoryChannel();
Configurables.configure(channel, new Context());
Expand All @@ -136,7 +139,7 @@ public void testOneEvent() throws Exception {

sink.process();
sink.stop();
HTable table = new HTable(testUtility.getConfiguration(), tableName);
HTable table = new HTable(conf, tableName);
byte[][] results = getResults(table, 1);
byte[] out = results[0];
Assert.assertArrayEquals(e.getBody(), out);
Expand All @@ -149,7 +152,7 @@ public void testOneEvent() throws Exception {
public void testThreeEvents() throws Exception {
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
ctx.put("batchSize", "3");
HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
Channel channel = new MemoryChannel();
Configurables.configure(channel, new Context());
Expand All @@ -165,7 +168,7 @@ public void testThreeEvents() throws Exception {
tx.close();
sink.process();
sink.stop();
HTable table = new HTable(testUtility.getConfiguration(), tableName);
HTable table = new HTable(conf, tableName);
byte[][] results = getResults(table, 3);
byte[] out;
int found = 0;
Expand All @@ -187,7 +190,7 @@ public void testThreeEvents() throws Exception {
public void testMultipleBatches() throws Exception {
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
ctx.put("batchSize", "2");
HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
//Reset the context to a higher batchSize
ctx.put("batchSize", "100");
Expand All @@ -209,7 +212,7 @@ public void testMultipleBatches() throws Exception {
}
sink.stop();
Assert.assertEquals(2, count);
HTable table = new HTable(testUtility.getConfiguration(), tableName);
HTable table = new HTable(conf, tableName);
byte[][] results = getResults(table, 3);
byte[] out;
int found = 0;
Expand All @@ -230,7 +233,7 @@ public void testMultipleBatches() throws Exception {
@Test(expected = FlumeException.class)
public void testMissingTable() throws Exception {
ctx.put("batchSize", "2");
HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
//Reset the context to a higher batchSize
ctx.put("batchSize", "100");
Expand All @@ -247,7 +250,7 @@ public void testMissingTable() throws Exception {
tx.commit();
tx.close();
sink.process();
HTable table = new HTable(testUtility.getConfiguration(), tableName);
HTable table = new HTable(conf, tableName);
byte[][] results = getResults(table, 2);
byte[] out;
int found = 0;
Expand Down Expand Up @@ -279,7 +282,7 @@ public void testMissingTable() throws Exception {
public void testHBaseFailure() throws Exception {
ctx.put("batchSize", "2");
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
//Reset the context to a higher batchSize
ctx.put("batchSize", "100");
Expand All @@ -296,7 +299,7 @@ public void testHBaseFailure() throws Exception {
tx.commit();
tx.close();
sink.process();
HTable table = new HTable(testUtility.getConfiguration(), tableName);
HTable table = new HTable(conf, tableName);
byte[][] results = getResults(table, 2);
byte[] out;
int found = 0;
Expand Down Expand Up @@ -373,7 +376,7 @@ private byte[][] getResults(HTable table, int numEvents) throws IOException{
public void testTransactionStateOnChannelException() throws Exception {
ctx.put("batchSize", "1");
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
// Reset the context to a higher batchSize
Channel channel = spy(new MemoryChannel());
Expand All @@ -396,7 +399,7 @@ public void testTransactionStateOnChannelException() throws Exception {
doReturn(e).when(channel).take();
sink.process();
sink.stop();
HTable table = new HTable(testUtility.getConfiguration(), tableName);
HTable table = new HTable(conf, tableName);
byte[][] results = getResults(table, 1);
byte[] out = results[0];
Assert.assertArrayEquals(e.getBody(), out);
Expand All @@ -411,7 +414,7 @@ public void testTransactionStateOnSerializationException() throws Exception {
ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER,
"org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer");
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
// Reset the context to a higher batchSize
Channel channel = new MemoryChannel();
Expand All @@ -434,7 +437,7 @@ public void testTransactionStateOnSerializationException() throws Exception {
MockSimpleHbaseEventSerializer.throwException = false;
sink.process();
sink.stop();
HTable table = new HTable(testUtility.getConfiguration(), tableName);
HTable table = new HTable(conf, tableName);
byte[][] results = getResults(table, 1);
byte[] out = results[0];
Assert.assertArrayEquals(e.getBody(), out);
Expand All @@ -447,10 +450,10 @@ public void testTransactionStateOnSerializationException() throws Exception {
public void testWithoutConfigurationObject() throws Exception{
ctx.put("batchSize", "2");
ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()) );
ZKConfig.getZKQuorumServersString(conf) );
System.out.print(ctx.getString(HBaseSinkConfigurationConstants.ZK_QUORUM));
ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
HBaseSink sink = new HBaseSink();
Configurables.configure(sink, ctx);
Expand All @@ -475,7 +478,7 @@ public void testWithoutConfigurationObject() throws Exception{
status = sink.process();
}
sink.stop();
HTable table = new HTable(testUtility.getConfiguration(), tableName);
HTable table = new HTable(conf, tableName);
byte[][] results = getResults(table, 3);
byte[] out;
int found = 0;
Expand All @@ -490,6 +493,7 @@ public void testWithoutConfigurationObject() throws Exception{
Assert.assertEquals(3, found);
out = results[3];
Assert.assertArrayEquals(Longs.toByteArray(3), out);
testUtility.deleteTable(tableName.getBytes());
}

@Test
Expand All @@ -499,7 +503,7 @@ public void testZKQuorum() throws Exception{
ctx.put("batchSize", "2");
ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum);
ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
HBaseSink sink = new HBaseSink();
Configurables.configure(sink, ctx);
Assert.assertEquals("zk1.flume.apache.org,zk2.flume.apache.org," +
Expand All @@ -516,7 +520,7 @@ public void testZKQuorumIncorrectPorts() throws Exception{
ctx.put("batchSize", "2");
ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum);
ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
HBaseSink sink = new HBaseSink();
Configurables.configure(sink, ctx);
Assert.fail();
Expand Down

0 comments on commit ad5f286

Please sign in to comment.