Skip to content

Commit

Permalink
Merge fb1b11a into 8912cec
Browse files Browse the repository at this point in the history
  • Loading branch information
philipphoffmann committed May 8, 2018
2 parents 8912cec + fb1b11a commit d5d122f
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public abstract class AbstractRedisSink extends AbstractSink implements Configur
private int redisTimeout;
private String redisPassword;
protected RedisMessageHandler messageHandler;
protected RedisSinkCounter counter;

@Override
public synchronized void start() {
Expand All @@ -30,6 +31,7 @@ public synchronized void start() {
// try to connect here already to find out about problems early on
// TODO: we may need to throw a special kind of exception here
jedis.connect();
counter.start();
super.start();

LOG.info("Redis Connected. (host: " + redisHost + ", port: " + String.valueOf(redisPort)
Expand All @@ -39,6 +41,7 @@ public synchronized void start() {
@Override
public synchronized void stop() {
jedis.disconnect();
counter.stop();
super.stop();
}

Expand All @@ -49,6 +52,10 @@ public void configure(Context context) {
redisTimeout = context.getInteger("redisTimeout", 2000);
redisPassword = context.getString("redisPassword", "");

if(counter == null) {
counter = new RedisSinkCounter(getName());
}

try {
String charset = context.getString("messageCharset", "utf-8");
String handlerClassName = context.getString("handler", "com.chiwanpark.flume.plugins.handler.RawHandler");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class AbstractRedisSource extends AbstractSource implements Configurable
private int redisTimeout;
private String redisPassword;
protected RedisMessageHandler messageHandler;
protected RedisSourceCounter counter;

@Override
public void configure(Context context) {
Expand All @@ -31,6 +32,10 @@ public void configure(Context context) {
redisTimeout = context.getInteger("redisTimeout", 2000);
redisPassword = context.getString("redisPassword", "");

if(counter == null) {
counter = new RedisSourceCounter(getName());
}

try {
String charset = context.getString("messageCharset", "utf-8");
String handlerClassName = context.getString("handler", "com.chiwanpark.flume.plugins.handler.RawHandler");
Expand Down Expand Up @@ -82,10 +87,12 @@ public synchronized void start() {
channelProcessor = getChannelProcessor();

connect();
counter.start();
}

@Override
public synchronized void stop() {
counter.stop();
super.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,23 @@ public Status process() throws EventDeliveryException {

try {
transaction.begin();
long startTime = System.nanoTime();

Event event = channel.take();
byte[] serialized = messageHandler.getBytes(event);

if (jedis.lpush(redisList, serialized) > 0) {
transaction.commit();
long endTime = System.nanoTime();
counter.incrementSinkSendTimeMicros((endTime - startTime) / (1000));
counter.incrementSinkSuccess();
status = Status.READY;
} else {
throw new EventDeliveryException("Event cannot be pushed into list " + redisList);
}
} catch (Throwable e) {
transaction.rollback();
counter.incrementSinkRollback();
status = Status.BACKOFF;

// we need to rethrow jedis exceptions, because they signal that something went wrong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ public Status process() throws EventDeliveryException {
try {
Event event = messageHandler.getEvent(serialized);
getChannelProcessor().processEvent(event);
counter.incrementSourceSent();
} catch (ChannelException e) {
jedis.rpush(redisList, serialized);
counter.incrementSourceRetry();
LOG.error("ChannelException is thrown.", e);
} catch (Exception e) {
counter.incrementSourceError();
LOG.error("RedisMessageHandler threw unexpected exception.", e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,24 @@ public Status process() throws EventDeliveryException {

try {
transaction.begin();
long startTime = System.nanoTime();

Event event = channel.take();
byte[] serialized = messageHandler.getBytes(event);

if (jedis.publish(redisChannel, serialized) > 0) {
transaction.commit();
long endTime = System.nanoTime();
counter.incrementSinkSendTimeMicros((endTime - startTime) / (1000));
counter.incrementSinkSuccess();
status = Status.READY;
} else {
throw new EventDeliveryException(
"Event is published, but there is no receiver in this channel named " + redisChannel);
}
} catch (Throwable e) {
transaction.rollback();
counter.incrementSinkRollback();
status = Status.BACKOFF;

// we need to rethrow jedis exceptions, because they signal that something went wrong
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/com/chiwanpark/flume/plugins/RedisSinkCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.chiwanpark.flume.plugins;

import org.apache.flume.instrumentation.SinkCounter;

public class RedisSinkCounter extends SinkCounter implements RedisSinkCounterMBean {
private static final String COUNTER_REDIS_SINK_SEND_TIME_MICROS = "redis.sink.sendTimeMicros";
private static final String COUNTER_REDIS_SINK_ROLLBACK = "redis.sink.rollback";
private static final String COUNTER_REDIS_SINK_SUCCESS = "redis.sink.success";

private static final String[] ATTRIBUTES = new String[] {
COUNTER_REDIS_SINK_SEND_TIME_MICROS,
COUNTER_REDIS_SINK_ROLLBACK,
COUNTER_REDIS_SINK_SUCCESS
};

public RedisSinkCounter(String name) {
super(name, ATTRIBUTES);
}

public void incrementSinkSendTimeMicros(long delta) {
this.addAndGet(COUNTER_REDIS_SINK_SEND_TIME_MICROS, delta);
}

public long getSinkSendTimeMicros() {
return this.get(COUNTER_REDIS_SINK_SEND_TIME_MICROS);
}

public void incrementSinkRollback() {
this.increment(COUNTER_REDIS_SINK_ROLLBACK);
}

public long getSinkRollback() {
return this.get(COUNTER_REDIS_SINK_ROLLBACK);
}

public void incrementSinkSuccess() {
this.increment(COUNTER_REDIS_SINK_SUCCESS);
}

public long getSinkSuccess() {
return this.get(COUNTER_REDIS_SINK_SUCCESS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.chiwanpark.flume.plugins;

public interface RedisSinkCounterMBean {
public long getSinkSendTimeMicros();
public long getSinkRollback();
public long getSinkSuccess();
}
43 changes: 43 additions & 0 deletions src/main/java/com/chiwanpark/flume/plugins/RedisSourceCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.chiwanpark.flume.plugins;

import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.instrumentation.SourceCounterMBean;

public class RedisSourceCounter extends SourceCounter implements SourceCounterMBean {
private static final String COUNTER_REDIS_SOURCE_SENT = "redis.source.sent";
private static final String COUNTER_REDIS_SOURCE_RETRY = "redis.source.retry";
private static final String COUNTER_REDIS_SOURCE_ERROR = "redis.source.error";
private static final String[] ATTRIBUTES = new String[] {
COUNTER_REDIS_SOURCE_SENT,
COUNTER_REDIS_SOURCE_RETRY,
COUNTER_REDIS_SOURCE_ERROR
};

public RedisSourceCounter(String name) {
super(name, ATTRIBUTES);
}

public void incrementSourceSent() {
this.increment(COUNTER_REDIS_SOURCE_SENT);
}

public long getSourceSent() {
return this.get(COUNTER_REDIS_SOURCE_SENT);
}

public void incrementSourceRetry() {
this.increment(COUNTER_REDIS_SOURCE_RETRY);
}

public long getSourceRetry() {
return this.get(COUNTER_REDIS_SOURCE_RETRY);
}

public void incrementSourceError() {
this.increment(COUNTER_REDIS_SOURCE_ERROR);
}

public long getSourceError() {
return this.get(COUNTER_REDIS_SOURCE_ERROR);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.chiwanpark.flume.plugins;

public interface RedisSourceCounterMBean {
public long getSourceSent();
public long getSourceRetry();
public long getSourceError();
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ private class JedisSubscribeListener extends BinaryJedisPubSub {
public void onMessage(byte[] channel, byte[] message) {
try {
channelProcessor.processEvent(messageHandler.getEvent(message));
counter.incrementSourceSent();
} catch (Exception e) {
counter.incrementSourceError();
LOG.error("RedisMessageHandler threw unexpected exception.", e);
}
}
Expand Down

0 comments on commit d5d122f

Please sign in to comment.