Skip to content

Commit

Permalink
Add Source using Redis List structure (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
chiwanpark committed Jun 10, 2015
1 parent 763f950 commit 1772e1e
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.chiwanpark.flume.plugins;

import com.chiwanpark.flume.plugins.handler.RedisSourceHandler;
import com.google.common.base.Throwables;
import org.apache.flume.Context;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

public class AbstractRedisSource extends AbstractSource implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRedisSource.class);

protected Jedis jedis;
protected ChannelProcessor channelProcessor;

private String redisHost;
private int redisPort;
private int redisTimeout;
private String redisPassword;
protected RedisSourceHandler handler;

@Override
public void configure(Context context) {
redisHost = context.getString("redisHost", "localhost");
redisPort = context.getInteger("redisPort", 6379);
redisTimeout = context.getInteger("redisTimeout", 2000);
redisPassword = context.getString("redisPassword", "");

try {
String charset = context.getString("messageCharset", "utf-8");
String handlerClassName = context.getString("handler", "com.chiwanpark.flume.plugins.handler.RawHandler");
@SuppressWarnings("unchecked")
Class<? extends RedisSourceHandler> clazz = (Class<? extends RedisSourceHandler>) Class.forName(handlerClassName);
handler = clazz.getDeclaredConstructor(String.class).newInstance(charset);
} catch (ClassNotFoundException ex) {
LOG.error("Error while configuring RedisSourceHandler. Exception follows.", ex);
Throwables.propagate(ex);
} catch (ClassCastException ex) {
LOG.error("Handler is not an instance of RedisSourceHandler. Handler must implement RedisSourceHandler.");
Throwables.propagate(ex);
} catch (Exception ex) {
LOG.error("Error configuring RedisSubscribeDrivenSource!", ex);
Throwables.propagate(ex);
}
}

protected void connect() {
LOG.info("Connecting...");
while (true) {
try {
jedis = new Jedis(redisHost, redisPort, redisTimeout);
if (!"".equals(redisPassword)) {
jedis.auth(redisPassword);
} else {
// Force a connection.
jedis.ping();
}
break;
} catch (JedisConnectionException e) {
LOG.error("Connection failed.", e);
LOG.info("Waiting for 10 seconds...");
try {
Thread.sleep(10000);
} catch (InterruptedException e2) {
// ?
}
}
}
LOG.info("Redis Connected. (host: " + redisHost + ", port: " + String.valueOf(redisPort)
+ ", timeout: " + String.valueOf(redisTimeout) + ")");
}

@Override
public synchronized void start() {
super.start();

channelProcessor = getChannelProcessor();

connect();
}

@Override
public synchronized void stop() {
super.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.chiwanpark.flume.plugins;

import com.google.common.base.Preconditions;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisListDrivenSource extends AbstractRedisSource implements PollableSource {
private static final Logger LOG = LoggerFactory.getLogger(RedisListDrivenSource.class);

private int redisDatabase;
private String redisList;

@Override
public void configure(Context context) {
redisDatabase = context.getInteger("redisDatabase", 0);
redisList = context.getString("redisList");
Preconditions.checkNotNull(redisList, "Redis List must be set.");

super.configure(context);
}

@Override
public synchronized void start() {
super.start();

if (redisDatabase != 0) {
final String result = jedis.select(redisDatabase);
if (!"OK".equals(result)) {
throw new RuntimeException("Cannot select database (database: " + redisDatabase + ")");
}
}
}

@Override
public Status process() throws EventDeliveryException {
String serialized = jedis.rpop(redisList);
if (serialized == null) {
return Status.BACKOFF;
}

try {
Event event = handler.getEvent(serialized);
getChannelProcessor().processEvent(event);
} catch (ChannelException e) {
jedis.rpush(redisList, serialized);
LOG.error("ChannelException is thrown.", e);
} catch (Exception e) {
LOG.error("RedisSourceHandler threw unexpected exception.", e);
}

return Status.READY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,108 +15,36 @@
*/
package com.chiwanpark.flume.plugins;

import com.chiwanpark.flume.plugins.handler.RedisSourceHandler;
import com.google.common.base.Throwables;
import com.google.common.base.Preconditions;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;

public class RedisSubscribeDrivenSource extends AbstractSource implements Configurable, EventDrivenSource {
public class RedisSubscribeDrivenSource extends AbstractRedisSource implements EventDrivenSource {

private static final Logger LOG = LoggerFactory.getLogger(RedisSubscribeDrivenSource.class);

private ChannelProcessor channelProcessor;

private Jedis jedis;
private String redisHost;
private int redisPort;
private String[] redisChannels;
private int redisTimeout;
private String redisPassword;
private RedisSourceHandler handler;

private boolean runFlag;


@Override
public void configure(Context context) {
redisHost = context.getString("redisHost", "localhost");
redisPort = context.getInteger("redisPort", 6379);
redisTimeout = context.getInteger("redisTimeout", 2000);
redisPassword = context.getString("redisPassword", "");

try {
String charset = context.getString("messageCharset", "utf-8");
String handlerClassName = context.getString("handler", "com.chiwanpark.flume.plugins.handler.RawHandler");
@SuppressWarnings("unchecked")
Class<? extends RedisSourceHandler> clazz = (Class<? extends RedisSourceHandler>) Class.forName(handlerClassName);
Class[] argTypes = new Class[1];
argTypes[0] = String.class;
handler = clazz.getDeclaredConstructor(argTypes).newInstance(charset);
} catch (ClassNotFoundException ex) {
LOG.error("Error while configuring RedisSourceHandler. Exception follows.", ex);
Throwables.propagate(ex);
} catch (ClassCastException ex) {
LOG.error("Handler is not an instance of RedisSourceHandler. Handler must implement RedisSourceHandler.");
Throwables.propagate(ex);
} catch (Exception ex) {
LOG.error("Error configuring RedisSubscribeDrivenSource!", ex);
Throwables.propagate(ex);
}

String redisChannel = context.getString("redisChannel");
if (redisChannel == null) {
throw new RuntimeException("Redis Channel must be set.");
}
Preconditions.checkNotNull(redisChannel, "Redis Channel must be set.");
redisChannels = redisChannel.split(",");

super.configure(context);
LOG.info("Flume Redis Subscribe Source Configured");
}

private void connect() {
LOG.info("Connecting...");
while (true) {
try {
jedis = new Jedis(redisHost, redisPort, redisTimeout);
if (!"".equals(redisPassword)) {
jedis.auth(redisPassword);
} else {
// Force a connection.
jedis.ping();
}
break;
} catch (JedisConnectionException e) {
LOG.error("Connection failed.", e);
LOG.info("Waiting for 10 seconds...");
try {
Thread.sleep(10000);
} catch (InterruptedException e2) {
// ?
}
}
}
LOG.info("Redis Connected. (host: " + redisHost + ", port: " + String.valueOf(redisPort)
+ ", timeout: " + String.valueOf(redisTimeout) + ")");
}

@Override
public synchronized void start() {
super.start();

channelProcessor = getChannelProcessor();

// TODO: consider using Connection Pool.
connect();

runFlag = true;

new Thread(new SubscribeManager()).start();
}

Expand All @@ -131,8 +59,6 @@ public synchronized void stop() {
} catch (InterruptedException e) {
e.printStackTrace();
}

// TODO: if we use jedis connection pool, destroy code must be inserted.
}

private class SubscribeManager implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.chiwanpark.flume.plugins;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
public class RedisListDrivenSourceTest extends RedisSourceTestBase {
private static final Logger LOG = LoggerFactory.getLogger(RedisListDrivenSourceTest.class);

@Before
public void setUp() throws Exception {
context.put("redisList", "flume-ng-redis-test");
source = new RedisListDrivenSource();

super.setUp();
}

@Test
public void testList() throws Exception {
String message = "testListMessage";

LOG.info("Try to send message to redis source.");
addMessageToRedisList("flume-ng-redis-test", message);
Thread.sleep(2000);

((RedisListDrivenSource) source).process();
Assert.assertEquals(message, getMessageFromChannel());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.chiwanpark.flume.plugins;

import com.google.common.collect.Lists;
import org.apache.flume.Channel;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.AbstractSource;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

@RunWith(JUnit4.class)
public class RedisSourceTestBase {
private static final Logger LOG = LoggerFactory.getLogger(RedisSourceTestBase.class);

protected Context context = new Context();
protected Channel channel = new MemoryChannel();
protected ChannelSelector channelSelector = new ReplicatingChannelSelector();
protected AbstractSource source;

@Before
public void setUp() throws Exception {
Configurables.configure(channel, context);
channelSelector.setChannels(Lists.newArrayList(channel));

source.setChannelProcessor(new ChannelProcessor(channelSelector));
Configurables.configure(source, context);

LOG.info("Try to start RedisSubscribeDrivenSource.");
source.start();
Thread.sleep(1000);
}

@After
public void tearDown() throws Exception {
source.stop();
}

protected void publishMessageToRedis(String channel, String message) throws Exception {
Jedis jedis = new Jedis("localhost", 6379);

jedis.publish(channel, message);

jedis.disconnect();
}

protected void addMessageToRedisList(String list, String message) throws Exception {
Jedis jedis = new Jedis("localhost", 6379);

jedis.lpush(list, message);

jedis.disconnect();
}

protected String getMessageFromChannel() throws Exception {
Transaction transaction = channel.getTransaction();
try {
transaction.begin();

Event event;

do {
event = channel.take();
} while (event == null);
transaction.commit();

return new String(event.getBody(), "UTF-8");
} finally {
transaction.close();
}
}
}
Loading

0 comments on commit 1772e1e

Please sign in to comment.