Skip to content

Commit

Permalink
Extend message handler structure to be used by sink and source both
Browse files Browse the repository at this point in the history
  • Loading branch information
chiwanpark committed Jun 10, 2015
1 parent 1772e1e commit cc8be0d
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 21 deletions.
22 changes: 20 additions & 2 deletions src/main/java/com/chiwanpark/flume/plugins/AbstractRedisSink.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.chiwanpark.flume.plugins;

import com.chiwanpark.flume.plugins.handler.RedisMessageHandler;
import com.google.common.base.Throwables;
import org.apache.flume.Context;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
Expand All @@ -16,7 +18,7 @@ public abstract class AbstractRedisSink extends AbstractSink implements Configur
private int redisPort;
private int redisTimeout;
private String redisPassword;
protected String messageCharset;
protected RedisMessageHandler messageHandler;

@Override
public synchronized void start() {
Expand All @@ -43,6 +45,22 @@ public void configure(Context context) {
redisPort = context.getInteger("redisPort", 6379);
redisTimeout = context.getInteger("redisTimeout", 2000);
redisPassword = context.getString("redisPassword", "");
messageCharset = context.getString("messageCharset", "utf-8");

try {
String charset = context.getString("messageCharset", "utf-8");
String handlerClassName = context.getString("handler", "com.chiwanpark.flume.plugins.handler.RawHandler");
@SuppressWarnings("unchecked")
Class<? extends RedisMessageHandler> clazz = (Class<? extends RedisMessageHandler>) Class.forName(handlerClassName);
messageHandler = clazz.getDeclaredConstructor(String.class).newInstance(charset);
} catch (ClassNotFoundException ex) {
LOG.error("Error while configuring RedisMessageHandler. Exception follows.", ex);
Throwables.propagate(ex);
} catch (ClassCastException ex) {
LOG.error("Handler is not an instance of RedisMessageHandler. Handler must implement RedisMessageHandler.");
Throwables.propagate(ex);
} catch (Exception ex) {
LOG.error("Error configuring RedisSubscribeDrivenSource!", ex);
Throwables.propagate(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.chiwanpark.flume.plugins;

import com.chiwanpark.flume.plugins.handler.RedisSourceHandler;
import com.chiwanpark.flume.plugins.handler.RedisMessageHandler;
import com.google.common.base.Throwables;
import org.apache.flume.Context;
import org.apache.flume.channel.ChannelProcessor;
Expand All @@ -21,7 +21,7 @@ public class AbstractRedisSource extends AbstractSource implements Configurable
private int redisPort;
private int redisTimeout;
private String redisPassword;
protected RedisSourceHandler handler;
protected RedisMessageHandler messageHandler;

@Override
public void configure(Context context) {
Expand All @@ -34,13 +34,13 @@ public void configure(Context context) {
String charset = context.getString("messageCharset", "utf-8");
String handlerClassName = context.getString("handler", "com.chiwanpark.flume.plugins.handler.RawHandler");
@SuppressWarnings("unchecked")
Class<? extends RedisSourceHandler> clazz = (Class<? extends RedisSourceHandler>) Class.forName(handlerClassName);
handler = clazz.getDeclaredConstructor(String.class).newInstance(charset);
Class<? extends RedisMessageHandler> clazz = (Class<? extends RedisMessageHandler>) Class.forName(handlerClassName);
messageHandler = clazz.getDeclaredConstructor(String.class).newInstance(charset);
} catch (ClassNotFoundException ex) {
LOG.error("Error while configuring RedisSourceHandler. Exception follows.", ex);
LOG.error("Error while configuring RedisMessageHandler. Exception follows.", ex);
Throwables.propagate(ex);
} catch (ClassCastException ex) {
LOG.error("Handler is not an instance of RedisSourceHandler. Handler must implement RedisSourceHandler.");
LOG.error("Handler is not an instance of RedisMessageHandler. Handler must implement RedisMessageHandler.");
Throwables.propagate(ex);
} catch (Exception ex) {
LOG.error("Error configuring RedisSubscribeDrivenSource!", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ public Status process() throws EventDeliveryException {
transaction.begin();

Event event = channel.take();
String serialized = messageHandler.getString(event);

if (jedis.lpush(redisList, new String(event.getBody(), messageCharset)) > 0) {
if (jedis.lpush(redisList, serialized) > 0) {
transaction.commit();
status = Status.READY;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ public Status process() throws EventDeliveryException {
}

try {
Event event = handler.getEvent(serialized);
Event event = messageHandler.getEvent(serialized);
getChannelProcessor().processEvent(event);
} catch (ChannelException e) {
jedis.rpush(redisList, serialized);
LOG.error("ChannelException is thrown.", e);
} catch (Exception e) {
LOG.error("RedisSourceHandler threw unexpected exception.", e);
LOG.error("RedisMessageHandler threw unexpected exception.", e);
}

return Status.READY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ public Status process() throws EventDeliveryException {
transaction.begin();

Event event = channel.take();
String serialized = messageHandler.getString(event);

if (jedis.publish(redisChannel, new String(event.getBody(), messageCharset)) > 0) {
if (jedis.publish(redisChannel, serialized) > 0) {
transaction.commit();
status = Status.READY;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ private class JedisSubscribeListener extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
try {
channelProcessor.processEvent(handler.getEvent(message));
channelProcessor.processEvent(messageHandler.getEvent(message));
} catch (Exception e) {
LOG.error("RedisSourceHandler threw unexpected exception.", e);
LOG.error("RedisMessageHandler threw unexpected exception.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package com.chiwanpark.flume.plugins.handler;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -50,10 +52,11 @@
* set in the request, then the charset is assumed to be JSON's default - UTF-8.
* The JSON handler supports UTF-8, UTF-16 and UTF-32.
*/
public class JSONHandler extends RedisSourceHandler {
public class JSONHandler extends RedisMessageHandler {

private static final Logger LOG = LoggerFactory.getLogger(JSONHandler.class);
private final JsonParser parser;
private final Gson gson;

/**
* {@inheritDoc}
Expand All @@ -73,6 +76,7 @@ public JSONHandler(String charset) {
}

parser = new JsonParser();
gson = new Gson();
}

public JSONHandler() {
Expand All @@ -96,7 +100,7 @@ public Event getEvent(String message) throws Exception {
body = bodyElm.getAsString();
}

HashMap<String, String> headers = null;
Map<String, String> headers = null;
if (json.has("headers")) {
headers = new HashMap<String, String>();
for (Map.Entry<String, JsonElement> header : json.get("headers").getAsJsonObject().entrySet()) {
Expand All @@ -112,4 +116,21 @@ public Event getEvent(String message) throws Exception {

return EventBuilder.withBody(body.getBytes(charset), headers);
}

@Override
public String getString(Event event) throws Exception {
JsonPrimitive body = new JsonPrimitive(new String(event.getBody(), charset));
JsonObject obj = new JsonObject();

obj.add("body", body);
if (!event.getHeaders().isEmpty()) {
JsonObject headers = new JsonObject();
for (Map.Entry<String, String> header : event.getHeaders().entrySet()) {
headers.add(header.getKey(), new JsonPrimitive(header.getValue()));
}
obj.add("headers", headers);
}

return gson.toJson(obj);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* RawHandler for RedisSource that accepts a string message and converts it
* to a flume Event having no headers and a body set to message.
*/
public class RawHandler extends RedisSourceHandler {
public class RawHandler extends RedisMessageHandler {

/**
* {@inheritDoc}
Expand All @@ -38,4 +38,12 @@ public RawHandler(String charset) {
public Event getEvent(String message) throws Exception {
return EventBuilder.withBody(message.getBytes(charset));
}

/**
* {@inheritDoc}
*/
@Override
public String getString(Event event) throws Exception {
return new String(event.getBody(), charset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import org.apache.flume.Event;

/**
* Event handler interface for RedisSource.
* Message handler interface for RedisSource and RedisSink.
*/
public abstract class RedisSourceHandler {
public abstract class RedisMessageHandler {

protected String charset;

Expand All @@ -29,7 +29,7 @@ public abstract class RedisSourceHandler {
*
* @param charset The charset of the messages.
*/
public RedisSourceHandler(String charset) {
public RedisMessageHandler(String charset) {
this.charset = charset;
}

Expand All @@ -44,4 +44,15 @@ public RedisSourceHandler(String charset) {
* @throws Exception If there was an unexpected error.
*/
public abstract Event getEvent(String message) throws Exception;

/**
* Takes a event and returns a string representing the event. The result
* of this method could be parsed as a Flume event by getEvent method. If
* this request cannot succeed, this method will throw exception.
*
* @param event The event to be serialized.
* @return String representing the given event.
* @throws Exception If there was an unexpected error.
*/
public abstract String getString(Event event) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
package com.chiwanpark.flume.plugins.handler;

import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -62,4 +66,43 @@ public void testJsonWithoutEncoding() throws Exception {

assertArrayEquals("한글".getBytes("utf-8"), event.getBody());
}

@Test
public void testJsonSerialization() throws Exception {
final String message = "hello";
final String charset = "utf-8";
final String expected = "{\"body\":\"hello\"}";
JSONHandler handler = new JSONHandler(charset);
Event event = EventBuilder.withBody(message, Charset.forName(charset));

String jsonified = handler.getString(event);
assertEquals(expected, jsonified);
}

@Test
public void testJsonSerializationWithHeadersAndBody() throws Exception {
final String message = "hello";
final Map<String, String> headers = new HashMap<String, String>();
headers.put("hello", "goodbye");

final String charset = "utf-8";
final String expected = "{\"body\":\"hello\",\"headers\":{\"hello\":\"goodbye\"}}";
JSONHandler handler = new JSONHandler(charset);
Event event = EventBuilder.withBody(message, Charset.forName(charset), headers);

String jsonified = handler.getString(event);
assertEquals(expected, jsonified);
}

@Test
public void testJsonSerde() throws Exception {
final String expected = "{\"body\":\"hello\",\"headers\":{\"hello\":\"goodbye\"}}";
final String charset = "utf-8";

JSONHandler handler = new JSONHandler(charset);
Event event = handler.getEvent(expected);
String result = handler.getString(event);

assertEquals(result, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,36 @@
package com.chiwanpark.flume.plugins.handler;

import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.nio.charset.Charset;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

@RunWith(JUnit4.class)
public class RawHandlerTest {

@Test
public void testRawMessageWithUTF8() throws Exception {
String testMessage = "test-UTF8, 한글";
final String testMessage = "test-UTF8, 한글";
RawHandler handler = new RawHandler("utf-8");
Event event = handler.getEvent(testMessage);

assertArrayEquals(testMessage.getBytes("utf-8"), event.getBody());
}

@Test
public void testSerializationWithUTF8() throws Exception {
final String charset = "utf-8";
final String testMessage = "test-UTF8, 한글";
RawHandler handler = new RawHandler(charset);
Event event = EventBuilder.withBody(testMessage, Charset.forName(charset));

String result = handler.getString(event);
assertEquals(testMessage, result);
}
}

0 comments on commit cc8be0d

Please sign in to comment.