Skip to content

Commit

Permalink
Added a ColumnFamilyChooser to choose the column family a event shoul…
Browse files Browse the repository at this point in the history
…d write to
  • Loading branch information
dcapwell committed Jan 19, 2013
1 parent 6b970ab commit acb5ac6
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 14 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -35,6 +35,12 @@
<artifactId>flume-ng-core</artifactId> <artifactId>flume-ng-core</artifactId>
<version>${flume.version}</version> <version>${flume.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.8</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>


<repositories> <repositories>
Expand Down
@@ -1,5 +1,7 @@
package org.apache.flume.cassandra; package org.apache.flume.cassandra;


import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.flume.Context; import org.apache.flume.Context;
import org.apache.flume.conf.Configurable; import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink; import org.apache.flume.sink.AbstractSink;
Expand All @@ -13,18 +15,15 @@
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ConsistencyLevel; import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.serializers.BytesArraySerializer;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory; import com.netflix.astyanax.thrift.ThriftFamilyFactory;


public abstract class AbstractCassandraSink extends AbstractSink implements Configurable { public abstract class AbstractCassandraSink extends AbstractSink implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractCassandraSink.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractCassandraSink.class);
protected AstyanaxContext<Keyspace> context; protected AstyanaxContext<Keyspace> context;
protected Keyspace keyspace; protected Keyspace keyspace;
protected long timeout; protected long timeout;
protected ColumnFamily<byte[], String> column_family; protected ColumnFamilyChooser chooser;


@Override @Override
public void stop() { public void stop() {
Expand All @@ -36,6 +35,8 @@ public void stop() {


@Override @Override
public void configure(final Context config) { public void configure(final Context config) {
Preconditions.checkArgument(config.getString("seeds") != null, "seeds must be defined in context");

String cluster = config.getString("cluster_name", "flume"); String cluster = config.getString("cluster_name", "flume");
if (keyspace != null) { if (keyspace != null) {
return; return;
Expand Down Expand Up @@ -65,8 +66,16 @@ public void configure(final Context config) {
context.start(); context.start();
LOG.info("Started keyspace with context: {}", context.toString()); LOG.info("Started keyspace with context: {}", context.toString());
keyspace = context.getEntity(); keyspace = context.getEntity();
String column_name = config.getString("column_name", "events");
column_family = new ColumnFamily<byte[], String>(column_name, BytesArraySerializer.get(), StringSerializer.get()); final String chooserName = config.getString("chooser", DefaultColumnFamilyChooser.class.getName());
try {
Class<? extends ColumnFamilyChooser> chooserClass =
(Class<? extends ColumnFamilyChooser>) Class.forName(chooserName);
chooser = chooserClass.newInstance();
chooser.configure(config);
} catch (Exception e) {
throw Throwables.propagate(e);
}


timeout = config.getLong("timeout_in_ms", 5000L); timeout = config.getLong("timeout_in_ms", 5000L);
} }
Expand Down
Expand Up @@ -25,15 +25,14 @@ public Status process() throws EventDeliveryException {
Transaction transaction = channel.getTransaction(); Transaction transaction = channel.getTransaction();
transaction.begin(); transaction.begin();
try { try {
MutationBatch mutation = keyspace.prepareMutationBatch();
ColumnListMutation<String> columns = mutation.withRow(column_family,
TimeUUIDSerializer.get().toBytes(UUID.randomUUID()));

Event event = channel.take(); Event event = channel.take();
if (event == null) { if (event == null) {
transaction.commit(); transaction.commit();
return Status.BACKOFF; return Status.BACKOFF;
} }
MutationBatch mutation = keyspace.prepareMutationBatch();
ColumnListMutation<String> columns = mutation.withRow(chooser.choose(event),
TimeUUIDSerializer.get().toBytes(UUID.randomUUID()));
if (event.getHeaders() != null) if (event.getHeaders() != null)
for (Entry<String, String> entry : event.getHeaders().entrySet()) for (Entry<String, String> entry : event.getHeaders().entrySet())
columns.putColumn(entry.getKey(), entry.getValue()); columns.putColumn(entry.getKey(), entry.getValue());
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/apache/flume/cassandra/CassandraSink.java
Expand Up @@ -26,15 +26,15 @@ public Status process() throws EventDeliveryException {
Transaction transaction = channel.getTransaction(); Transaction transaction = channel.getTransaction();
transaction.begin(); transaction.begin();
try { try {
MutationBatch mutation = keyspace.prepareMutationBatch();
long timestamp = System.currentTimeMillis();
String date = getDate(timestamp);
ColumnListMutation<String> columns = mutation.withRow(column_family, StringSerializer.get().toBytes(date));
Event event = channel.take(); Event event = channel.take();
if (event == null) { if (event == null) {
transaction.commit(); transaction.commit();
return Status.BACKOFF; return Status.BACKOFF;
} }
MutationBatch mutation = keyspace.prepareMutationBatch();
long timestamp = System.currentTimeMillis();
String date = getDate(timestamp);
ColumnListMutation<String> columns = mutation.withRow(chooser.choose(event), StringSerializer.get().toBytes(date));
String prefix = null; String prefix = null;
if (event.getHeaders() != null) { if (event.getHeaders() != null) {
prefix = event.getHeaders().get(EVENT_PREFIX_KEY); prefix = event.getHeaders().get(EVENT_PREFIX_KEY);
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/org/apache/flume/cassandra/ColumnFamilyChooser.java
@@ -0,0 +1,21 @@
package org.apache.flume.cassandra;

import com.netflix.astyanax.model.ColumnFamily;
import org.apache.flume.Event;
import org.apache.flume.conf.Configurable;

import javax.annotation.Nonnull;

/**
* Choose a {@link ColumnFamily} to send a given event to.
*/
public interface ColumnFamilyChooser extends Configurable {

/**
* Choose a {@link ColumnFamily} to send the given event to. This method will be called for every event.
*
* @param event to index
* @return column family the event should be sent to
*/
ColumnFamily<byte[], String> choose(@Nonnull Event event);
}
@@ -0,0 +1,34 @@
package org.apache.flume.cassandra;

import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.serializers.BytesArraySerializer;
import com.netflix.astyanax.serializers.StringSerializer;
import org.apache.flume.Context;
import org.apache.flume.Event;

import javax.annotation.Nonnull;

/**
* {@link #choose(org.apache.flume.Event)} returns the {@link ColumnFamily} defined in the {@link Context}.
*/
public class DefaultColumnFamilyChooser implements ColumnFamilyChooser {
/**
* Used by {@link #configure(org.apache.flume.Context)} to look up the column name to use. This is the key in
* the {@link Context}. Defaults to "events".
*/
public static final String COLUMN_NAME = "column_name";

private ColumnFamily<byte[], String> columnFamily;

@Override
public ColumnFamily<byte[], String> choose(@Nonnull final Event event) {
return columnFamily;
}

@Override
public void configure(final Context context) {
final String column_name = context.getString(COLUMN_NAME, "events");
this.columnFamily = new ColumnFamily<byte[], String>(column_name,
BytesArraySerializer.get(), StringSerializer.get());
}
}
@@ -0,0 +1,51 @@
package org.apache.flume.cassandra;

import com.netflix.astyanax.model.ColumnFamily;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test
public class AbstractCassandraSinkTest {

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "seeds must be defined in context")
public void testConfigureEmptyContext() throws Exception {
AbstractCassandraSink sink = new NoOpCassandraSink();
Context context = new Context();

sink.configure(context);
}

public void testConfigure() throws Exception {
NoOpCassandraSink sink = new NoOpCassandraSink();
Context context = new Context();
context.put("seeds", "localhost");
sink.configure(context);

Assert.assertEquals(sink.getChooser().getClass(), DefaultColumnFamilyChooser.class);

ColumnFamily<byte[], String> columnFamily = sink.getChooser().choose(null);
Assert.assertEquals(columnFamily.getName(), "events");
}

/**
* Used to test {@link #configure(org.apache.flume.Context)}.
*/
private static class NoOpCassandraSink extends AbstractCassandraSink {

@Override
protected String getConfigName() {
return null; // this method isn't used anymore
}

@Override
public Status process() throws EventDeliveryException {
return null; // no-op
}

public ColumnFamilyChooser getChooser() {
return chooser;
}
}
}
@@ -0,0 +1,29 @@
package org.apache.flume.cassandra;

import com.netflix.astyanax.model.ColumnFamily;
import org.apache.flume.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test
public class DefaultColumnFamilyChooserTest {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultColumnFamilyChooserTest.class.getName());

public void testConfigure() throws Exception {
ColumnFamilyChooser chooser = new DefaultColumnFamilyChooser();
Context context = new Context();
context.put(DefaultColumnFamilyChooser.COLUMN_NAME, "testConfigure");
chooser.configure(context);

ColumnFamily<byte[], String> cf = chooser.choose(null);
Assert.assertEquals(cf.getName(), "testConfigure");

context.put(DefaultColumnFamilyChooser.COLUMN_NAME, "testConfigure-v2");
chooser.configure(context);

cf = chooser.choose(null);
Assert.assertEquals(cf.getName(), "testConfigure-v2");
}
}

0 comments on commit acb5ac6

Please sign in to comment.