From cc8be0d997c3755ecf5eadf0dadd82733fd0f05b Mon Sep 17 00:00:00 2001 From: Chiwan Park Date: Wed, 10 Jun 2015 13:05:37 +0900 Subject: [PATCH] Extend message handler structure to be used by sink and source both --- .../flume/plugins/AbstractRedisSink.java | 22 +++++++++- .../flume/plugins/AbstractRedisSource.java | 12 +++--- .../flume/plugins/RedisListDrivenSink.java | 3 +- .../flume/plugins/RedisListDrivenSource.java | 4 +- .../flume/plugins/RedisPublishDrivenSink.java | 3 +- .../plugins/RedisSubscribeDrivenSource.java | 4 +- .../flume/plugins/handler/JSONHandler.java | 25 ++++++++++- .../flume/plugins/handler/RawHandler.java | 10 ++++- ...eHandler.java => RedisMessageHandler.java} | 17 ++++++-- .../plugins/handler/JSONHandlerTest.java | 43 +++++++++++++++++++ .../flume/plugins/handler/RawHandlerTest.java | 17 +++++++- 11 files changed, 139 insertions(+), 21 deletions(-) rename src/main/java/com/chiwanpark/flume/plugins/handler/{RedisSourceHandler.java => RedisMessageHandler.java} (69%) diff --git a/src/main/java/com/chiwanpark/flume/plugins/AbstractRedisSink.java b/src/main/java/com/chiwanpark/flume/plugins/AbstractRedisSink.java index 71a1cd2..69e15ed 100644 --- a/src/main/java/com/chiwanpark/flume/plugins/AbstractRedisSink.java +++ b/src/main/java/com/chiwanpark/flume/plugins/AbstractRedisSink.java @@ -1,5 +1,7 @@ package com.chiwanpark.flume.plugins; +import com.chiwanpark.flume.plugins.handler.RedisMessageHandler; +import com.google.common.base.Throwables; import org.apache.flume.Context; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; @@ -16,7 +18,7 @@ public abstract class AbstractRedisSink extends AbstractSink implements Configur private int redisPort; private int redisTimeout; private String redisPassword; - protected String messageCharset; + protected RedisMessageHandler messageHandler; @Override public synchronized void start() { @@ -43,6 +45,22 @@ public void configure(Context context) { redisPort = context.getInteger("redisPort", 6379); redisTimeout = context.getInteger("redisTimeout", 2000); redisPassword = context.getString("redisPassword", ""); - messageCharset = context.getString("messageCharset", "utf-8"); + + try { + String charset = context.getString("messageCharset", "utf-8"); + String handlerClassName = context.getString("handler", "com.chiwanpark.flume.plugins.handler.RawHandler"); + @SuppressWarnings("unchecked") + Class clazz = (Class) Class.forName(handlerClassName); + messageHandler = clazz.getDeclaredConstructor(String.class).newInstance(charset); + } catch (ClassNotFoundException ex) { + LOG.error("Error while configuring RedisMessageHandler. Exception follows.", ex); + Throwables.propagate(ex); + } catch (ClassCastException ex) { + LOG.error("Handler is not an instance of RedisMessageHandler. Handler must implement RedisMessageHandler."); + Throwables.propagate(ex); + } catch (Exception ex) { + LOG.error("Error configuring RedisSubscribeDrivenSource!", ex); + Throwables.propagate(ex); + } } } diff --git a/src/main/java/com/chiwanpark/flume/plugins/AbstractRedisSource.java b/src/main/java/com/chiwanpark/flume/plugins/AbstractRedisSource.java index 0854d0f..69d485b 100644 --- a/src/main/java/com/chiwanpark/flume/plugins/AbstractRedisSource.java +++ b/src/main/java/com/chiwanpark/flume/plugins/AbstractRedisSource.java @@ -1,6 +1,6 @@ package com.chiwanpark.flume.plugins; -import com.chiwanpark.flume.plugins.handler.RedisSourceHandler; +import com.chiwanpark.flume.plugins.handler.RedisMessageHandler; import com.google.common.base.Throwables; import org.apache.flume.Context; import org.apache.flume.channel.ChannelProcessor; @@ -21,7 +21,7 @@ public class AbstractRedisSource extends AbstractSource implements Configurable private int redisPort; private int redisTimeout; private String redisPassword; - protected RedisSourceHandler handler; + protected RedisMessageHandler messageHandler; @Override public void configure(Context context) { @@ -34,13 +34,13 @@ public void configure(Context context) { String charset = context.getString("messageCharset", "utf-8"); String handlerClassName = context.getString("handler", "com.chiwanpark.flume.plugins.handler.RawHandler"); @SuppressWarnings("unchecked") - Class clazz = (Class) Class.forName(handlerClassName); - handler = clazz.getDeclaredConstructor(String.class).newInstance(charset); + Class clazz = (Class) Class.forName(handlerClassName); + messageHandler = clazz.getDeclaredConstructor(String.class).newInstance(charset); } catch (ClassNotFoundException ex) { - LOG.error("Error while configuring RedisSourceHandler. Exception follows.", ex); + LOG.error("Error while configuring RedisMessageHandler. Exception follows.", ex); Throwables.propagate(ex); } catch (ClassCastException ex) { - LOG.error("Handler is not an instance of RedisSourceHandler. Handler must implement RedisSourceHandler."); + LOG.error("Handler is not an instance of RedisMessageHandler. Handler must implement RedisMessageHandler."); Throwables.propagate(ex); } catch (Exception ex) { LOG.error("Error configuring RedisSubscribeDrivenSource!", ex); diff --git a/src/main/java/com/chiwanpark/flume/plugins/RedisListDrivenSink.java b/src/main/java/com/chiwanpark/flume/plugins/RedisListDrivenSink.java index 2fc910e..e992657 100644 --- a/src/main/java/com/chiwanpark/flume/plugins/RedisListDrivenSink.java +++ b/src/main/java/com/chiwanpark/flume/plugins/RedisListDrivenSink.java @@ -54,8 +54,9 @@ public Status process() throws EventDeliveryException { transaction.begin(); Event event = channel.take(); + String serialized = messageHandler.getString(event); - if (jedis.lpush(redisList, new String(event.getBody(), messageCharset)) > 0) { + if (jedis.lpush(redisList, serialized) > 0) { transaction.commit(); status = Status.READY; } else { diff --git a/src/main/java/com/chiwanpark/flume/plugins/RedisListDrivenSource.java b/src/main/java/com/chiwanpark/flume/plugins/RedisListDrivenSource.java index fccdf17..aa98787 100644 --- a/src/main/java/com/chiwanpark/flume/plugins/RedisListDrivenSource.java +++ b/src/main/java/com/chiwanpark/flume/plugins/RedisListDrivenSource.java @@ -44,13 +44,13 @@ public Status process() throws EventDeliveryException { } try { - Event event = handler.getEvent(serialized); + Event event = messageHandler.getEvent(serialized); getChannelProcessor().processEvent(event); } catch (ChannelException e) { jedis.rpush(redisList, serialized); LOG.error("ChannelException is thrown.", e); } catch (Exception e) { - LOG.error("RedisSourceHandler threw unexpected exception.", e); + LOG.error("RedisMessageHandler threw unexpected exception.", e); } return Status.READY; diff --git a/src/main/java/com/chiwanpark/flume/plugins/RedisPublishDrivenSink.java b/src/main/java/com/chiwanpark/flume/plugins/RedisPublishDrivenSink.java index 42293eb..eb9fd94 100644 --- a/src/main/java/com/chiwanpark/flume/plugins/RedisPublishDrivenSink.java +++ b/src/main/java/com/chiwanpark/flume/plugins/RedisPublishDrivenSink.java @@ -57,8 +57,9 @@ public Status process() throws EventDeliveryException { transaction.begin(); Event event = channel.take(); + String serialized = messageHandler.getString(event); - if (jedis.publish(redisChannel, new String(event.getBody(), messageCharset)) > 0) { + if (jedis.publish(redisChannel, serialized) > 0) { transaction.commit(); status = Status.READY; } else { diff --git a/src/main/java/com/chiwanpark/flume/plugins/RedisSubscribeDrivenSource.java b/src/main/java/com/chiwanpark/flume/plugins/RedisSubscribeDrivenSource.java index b454bb3..77d0a01 100644 --- a/src/main/java/com/chiwanpark/flume/plugins/RedisSubscribeDrivenSource.java +++ b/src/main/java/com/chiwanpark/flume/plugins/RedisSubscribeDrivenSource.java @@ -118,9 +118,9 @@ private class JedisSubscribeListener extends JedisPubSub { @Override public void onMessage(String channel, String message) { try { - channelProcessor.processEvent(handler.getEvent(message)); + channelProcessor.processEvent(messageHandler.getEvent(message)); } catch (Exception e) { - LOG.error("RedisSourceHandler threw unexpected exception.", e); + LOG.error("RedisMessageHandler threw unexpected exception.", e); } } diff --git a/src/main/java/com/chiwanpark/flume/plugins/handler/JSONHandler.java b/src/main/java/com/chiwanpark/flume/plugins/handler/JSONHandler.java index 5938066..3622b24 100644 --- a/src/main/java/com/chiwanpark/flume/plugins/handler/JSONHandler.java +++ b/src/main/java/com/chiwanpark/flume/plugins/handler/JSONHandler.java @@ -15,9 +15,11 @@ */ package com.chiwanpark.flume.plugins.handler; +import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; import org.slf4j.Logger; @@ -50,10 +52,11 @@ * set in the request, then the charset is assumed to be JSON's default - UTF-8. * The JSON handler supports UTF-8, UTF-16 and UTF-32. */ -public class JSONHandler extends RedisSourceHandler { +public class JSONHandler extends RedisMessageHandler { private static final Logger LOG = LoggerFactory.getLogger(JSONHandler.class); private final JsonParser parser; + private final Gson gson; /** * {@inheritDoc} @@ -73,6 +76,7 @@ public JSONHandler(String charset) { } parser = new JsonParser(); + gson = new Gson(); } public JSONHandler() { @@ -96,7 +100,7 @@ public Event getEvent(String message) throws Exception { body = bodyElm.getAsString(); } - HashMap headers = null; + Map headers = null; if (json.has("headers")) { headers = new HashMap(); for (Map.Entry header : json.get("headers").getAsJsonObject().entrySet()) { @@ -112,4 +116,21 @@ public Event getEvent(String message) throws Exception { return EventBuilder.withBody(body.getBytes(charset), headers); } + + @Override + public String getString(Event event) throws Exception { + JsonPrimitive body = new JsonPrimitive(new String(event.getBody(), charset)); + JsonObject obj = new JsonObject(); + + obj.add("body", body); + if (!event.getHeaders().isEmpty()) { + JsonObject headers = new JsonObject(); + for (Map.Entry header : event.getHeaders().entrySet()) { + headers.add(header.getKey(), new JsonPrimitive(header.getValue())); + } + obj.add("headers", headers); + } + + return gson.toJson(obj); + } } diff --git a/src/main/java/com/chiwanpark/flume/plugins/handler/RawHandler.java b/src/main/java/com/chiwanpark/flume/plugins/handler/RawHandler.java index 2fdb344..ef3dcce 100644 --- a/src/main/java/com/chiwanpark/flume/plugins/handler/RawHandler.java +++ b/src/main/java/com/chiwanpark/flume/plugins/handler/RawHandler.java @@ -22,7 +22,7 @@ * RawHandler for RedisSource that accepts a string message and converts it * to a flume Event having no headers and a body set to message. */ -public class RawHandler extends RedisSourceHandler { +public class RawHandler extends RedisMessageHandler { /** * {@inheritDoc} @@ -38,4 +38,12 @@ public RawHandler(String charset) { public Event getEvent(String message) throws Exception { return EventBuilder.withBody(message.getBytes(charset)); } + + /** + * {@inheritDoc} + */ + @Override + public String getString(Event event) throws Exception { + return new String(event.getBody(), charset); + } } diff --git a/src/main/java/com/chiwanpark/flume/plugins/handler/RedisSourceHandler.java b/src/main/java/com/chiwanpark/flume/plugins/handler/RedisMessageHandler.java similarity index 69% rename from src/main/java/com/chiwanpark/flume/plugins/handler/RedisSourceHandler.java rename to src/main/java/com/chiwanpark/flume/plugins/handler/RedisMessageHandler.java index 32880eb..5fe16b9 100644 --- a/src/main/java/com/chiwanpark/flume/plugins/handler/RedisSourceHandler.java +++ b/src/main/java/com/chiwanpark/flume/plugins/handler/RedisMessageHandler.java @@ -18,9 +18,9 @@ import org.apache.flume.Event; /** - * Event handler interface for RedisSource. + * Message handler interface for RedisSource and RedisSink. */ -public abstract class RedisSourceHandler { +public abstract class RedisMessageHandler { protected String charset; @@ -29,7 +29,7 @@ public abstract class RedisSourceHandler { * * @param charset The charset of the messages. */ - public RedisSourceHandler(String charset) { + public RedisMessageHandler(String charset) { this.charset = charset; } @@ -44,4 +44,15 @@ public RedisSourceHandler(String charset) { * @throws Exception If there was an unexpected error. */ public abstract Event getEvent(String message) throws Exception; + + /** + * Takes a event and returns a string representing the event. The result + * of this method could be parsed as a Flume event by getEvent method. If + * this request cannot succeed, this method will throw exception. + * + * @param event The event to be serialized. + * @return String representing the given event. + * @throws Exception If there was an unexpected error. + */ + public abstract String getString(Event event) throws Exception; } diff --git a/src/test/java/com/chiwanpark/flume/plugins/handler/JSONHandlerTest.java b/src/test/java/com/chiwanpark/flume/plugins/handler/JSONHandlerTest.java index 46f8dad..463a7ba 100644 --- a/src/test/java/com/chiwanpark/flume/plugins/handler/JSONHandlerTest.java +++ b/src/test/java/com/chiwanpark/flume/plugins/handler/JSONHandlerTest.java @@ -16,11 +16,15 @@ package com.chiwanpark.flume.plugins.handler; import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.nio.charset.Charset; import java.nio.charset.UnsupportedCharsetException; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -62,4 +66,43 @@ public void testJsonWithoutEncoding() throws Exception { assertArrayEquals("한글".getBytes("utf-8"), event.getBody()); } + + @Test + public void testJsonSerialization() throws Exception { + final String message = "hello"; + final String charset = "utf-8"; + final String expected = "{\"body\":\"hello\"}"; + JSONHandler handler = new JSONHandler(charset); + Event event = EventBuilder.withBody(message, Charset.forName(charset)); + + String jsonified = handler.getString(event); + assertEquals(expected, jsonified); + } + + @Test + public void testJsonSerializationWithHeadersAndBody() throws Exception { + final String message = "hello"; + final Map headers = new HashMap(); + headers.put("hello", "goodbye"); + + final String charset = "utf-8"; + final String expected = "{\"body\":\"hello\",\"headers\":{\"hello\":\"goodbye\"}}"; + JSONHandler handler = new JSONHandler(charset); + Event event = EventBuilder.withBody(message, Charset.forName(charset), headers); + + String jsonified = handler.getString(event); + assertEquals(expected, jsonified); + } + + @Test + public void testJsonSerde() throws Exception { + final String expected = "{\"body\":\"hello\",\"headers\":{\"hello\":\"goodbye\"}}"; + final String charset = "utf-8"; + + JSONHandler handler = new JSONHandler(charset); + Event event = handler.getEvent(expected); + String result = handler.getString(event); + + assertEquals(result, expected); + } } diff --git a/src/test/java/com/chiwanpark/flume/plugins/handler/RawHandlerTest.java b/src/test/java/com/chiwanpark/flume/plugins/handler/RawHandlerTest.java index e2710bb..50c753b 100644 --- a/src/test/java/com/chiwanpark/flume/plugins/handler/RawHandlerTest.java +++ b/src/test/java/com/chiwanpark/flume/plugins/handler/RawHandlerTest.java @@ -16,21 +16,36 @@ package com.chiwanpark.flume.plugins.handler; import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.nio.charset.Charset; + import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; @RunWith(JUnit4.class) public class RawHandlerTest { @Test public void testRawMessageWithUTF8() throws Exception { - String testMessage = "test-UTF8, 한글"; + final String testMessage = "test-UTF8, 한글"; RawHandler handler = new RawHandler("utf-8"); Event event = handler.getEvent(testMessage); assertArrayEquals(testMessage.getBytes("utf-8"), event.getBody()); } + + @Test + public void testSerializationWithUTF8() throws Exception { + final String charset = "utf-8"; + final String testMessage = "test-UTF8, 한글"; + RawHandler handler = new RawHandler(charset); + Event event = EventBuilder.withBody(testMessage, Charset.forName(charset)); + + String result = handler.getString(event); + assertEquals(testMessage, result); + } }