Browse files

enhanced trident write so it could emit and carry tuple forward, upda…

  • Loading branch information...
1 parent 4ebcb50 commit 84ed57a9e554f9dea92764ed5dfda7ff778b8c00 Alex Collautt committed Jan 30, 2013
Showing with 33 additions and 5 deletions.
  1. +18 −5
  2. +15 −0 src/main/java/com/hmsonline/storm/cassandra/trident/
@@ -25,12 +25,17 @@ Maven artifacts for releases will be available on maven central.
**Basic Usage**
-`CassandraBolt` expects that a Cassandra hostname, port, and keyspace be set in the Storm topology configuration:
+`CassandraBolt`, `TridentCassandraLookupFunction`, and `TridentCassandraWriteFunction` expects that a Cassandra hostname,
+port, and keyspace be set in the Storm topology configuration. To allow for multiple instances of these in a topology
+and not require that they all connect to the same Cassandra instance the values are added to the Storm configuration
+using a key and a map. The key to indicate which map to use is set in the constructor of these classes when instantiating
+ Map<String, Object> cassandraConfig = new HashMap<String, Object>();
+ cassandraConfig.put(CassandraBolt.CASSANDRA_HOST, "localhost:9160");
+ cassandraConfig.put(CassandraBolt.CASSANDRA_KEYSPACE, "testKeyspace");
Config config = new Config();
- config.put(CassandraBolt.CASSANDRA_HOST, "localhost:9160");
- config.put(CassandraBolt.CASSANDRA_KEYSPACE, "testKeyspace");
+ config.put("CassandraLocal", cassandraConfig);
The `CassandraBolt` class provides a convenience constructor that takes a column family name, and row key field value as arguments:
@@ -51,6 +56,14 @@ Would yield the following Cassandra row (as seen from `cassandra-cli`):
=> (column=field1, value=foo, timestamp=1321938505071001)
=> (column=field2, value=bar, timestamp=1321938505072000)
+**Cassandra Write Function**
+Storm Trident filters out the original Tuple if a function doesn't emit anything. To allow for additional processing after
+writing to Cassandra the `TridentCassandraWriteFunction` can emit a static Object value. The main purpose for this emit is
+to simply allow the Tuple to continue as opposed to filtering it out. The static value can be set in either the constructor
+or by calling the setValueToEmitAfterWrite method. Setting the emit value to NULL will cause the function to not emit anything
+and Storm will filter the Tuple out. Default behavior is to not emit.
+If the function will emit a value don't forget to declare the output field when building the topology.
**Cassandra Counter Columns**
The Counter Column concept is similar to the above,
@@ -10,6 +10,7 @@
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;
import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Values;
import com.hmsonline.storm.cassandra.bolt.mapper.TridentTupleMapper;
import com.hmsonline.storm.cassandra.client.AstyanaxClient;
@@ -25,13 +26,24 @@
private Class<K> columnNameClass;
private Class<V> columnValueClass;
private String clientConfigKey;
+ private Object valueToEmit;
+ public void setValueToEmitAfterWrite(Object valueToEmit) {
+ this.valueToEmit = valueToEmit;
+ }
public TridentCassandraWriteFunction(String clientConfigKey, TridentTupleMapper<K, V> tupleMapper,
Class<K> columnNameClass, Class<V> columnValueClass) {
this.tupleMapper = tupleMapper;
this.columnNameClass = columnNameClass;
this.columnValueClass = columnValueClass;
this.clientConfigKey = clientConfigKey;
+ this.valueToEmit = null;
+ }
+ public TridentCassandraWriteFunction(String clientConfigKey, TridentTupleMapper<K, V> tupleMapper,
+ Class<K> columnNameClass, Class<V> columnValueClass, Object valueToEmit) {
+ this(clientConfigKey, tupleMapper, columnNameClass, columnValueClass);
+ this.valueToEmit = valueToEmit;
@@ -51,6 +63,9 @@ public void cleanup() {
public void execute(TridentTuple tuple, TridentCollector collector) {
try {
+ if (this.valueToEmit != null) {
+ collector.emit(new Values(this.valueToEmit));
+ }
} catch (TupleMappingException e) {
LOG.error("Skipping tuple: " + tuple, e);
} catch (StormCassandraException e) {

0 comments on commit 84ed57a

Please sign in to comment.