Skip to content

PatrickCallaghan/datastax-eventsourcing

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

42 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Event Sourcing

This demo shows how Cassandra and DSE can be using to store and replay events.

To use Spark you will need to provide your own Cassandra and Spark deployments. In this demo we will use DSE as they are already integrated.

First we start DSE in SearchAnalyics mode to allow us to use both Spark and DSE Search - http://docs.datastax.com/en/dse/5.1/dse-admin/datastax_enterprise/operations/startStop/startDseStandalone.html?hl=starting

The implementation uses bucketing to group all data into particular time buckets for replay. The time bucket used in this example is 1 minute but any time bucket can be used. Also depending how many days, months, years of events that need to be kept, it may be beneficial to spread the events over different tiers of tables.

To create the schema, run the following

mvn clean compile exec:java -Dexec.mainClass="com.datastax.demo.SchemaSetup" -DcontactPoints=localhost

To create the solr core to make our table searchable, run the following

dsetool create_core datastax.eventsource generateResources=true

To create events, run the following (Default of 10 million events)

mvn clean compile exec:java -Dexec.mainClass="com.datastax.events.Main"  -DcontactPoints=localhost -DnoOfEvents=10000000

To replay a sample event set, run

mvn clean compile exec:java -Dexec.mainClass="com.datastax.events.ReadEvents"  -DcontactPoints=localhost -Dfrom=yyyyMMdd-hhmmss -Dto=yyyyMMdd-hhmmss

eg

mvn clean compile exec:java -Dexec.mainClass="com.datastax.events.ReadEvents"  -DcontactPoints=localhost -Dfrom=20160805-000000 -Dto=20160805-010000

This replays 2 scenarios

1. Replay all events for a specified time range
2. Replay all events for a specified time range and a specific event type.		

To run the webservice

mvn jetty:run -Djetty.port=8081 

To run a rest query, go the brower and enter a url in the format http://localhost:8080/datastax-eventsourcing/rest/getevents/from/to, where the date format is 'yyyyMMdd-hhmmss' e.g. For all events from midnight to 1:00 am on the 1st of August 2016 run -

http://localhost:8081/datastax-eventsourcing/rest/getevents/20160801-000000/20160801-010000/

We can also use cql to query using the Solr query from DSE Search

Get all LOGIN Events from 9th Aug 2016 at 12:30 to 11th Aug 2016 at 12:30

select * from datastax.eventsource where solr_query = '{"q":"eventtype:LOGIN", "fq": "time:[2016-08-09T12:30:00.000Z TO 2016-08-11T12:30:00.000Z]", "sort":"time desc"}' limit 10000;

To use Spark, using DSE we can just 'dse spark' to use the repl.

First we will create an Event object which will hold our events objects

case class Event (date: String, bucket: Int, id: java.util.UUID, data: String, eventtype: String, 
aggregatetype: String, time: java.util.Date, loglevel: String, host: String); 

val events =  sc.cassandraTable[Event]("datastax", "eventsource").cache; 
events.count

val max = events.map(_.time).max
val min = events.map(_.time).min

We can query our data and return events before or after a certain time.

val yesterday = new java.util.Date(java.util.Calendar.getInstance().getTime().getTime()-200000000);
yesterday

val before = events.filter(_.time.before(yesterday)); 
before.take(10).foreach(print) 
before.count
 
val after = events.filter(_.time.after(yesterday)); 
after.take(10).foreach(print) 
after.count

Or we can use filtering to just get the events between two dates.

val start = new java.util.Date(java.util.Calendar.getInstance().getTime().getTime()-200000000);
val end = new java.util.Date(java.util.Calendar.getInstance().getTime().getTime()-190000000);

val filtered = events.filter(_.time.after(start)).filter(_.time.before(end)).cache;
filtered.count

Lets get all number of events per host and a list of all distinct hosts.

var hostCounts =  events.map(f => (f.host, 1)).reduceByKey(_ + _)
hostCounts.collect().foreach(println)

var hosts =  hostCounts.map(f => (f._1))
hosts.collect().foreach(println)

To use spark sql - try the following with a valid date

val results = sqlContext.sql("SELECT * from datastax.eventsource where date = '20161019'")
 
results.take(5).foreach(println)

val results = sqlContext.sql("SELECT * from datastax.eventsource where time > '2016-10-22 16:18:07' ");

results.take(5).foreach(println)

val results = sqlContext.sql("SELECT * from datastax.eventsource where time > '2016-10-22 16:18:07' and time < '2016-10-23 16:18:07'");

results.count

To remove the tables and the schema, run the following.

mvn clean compile exec:java -Dexec.mainClass="com.datastax.demo.SchemaTeardown"

About

Small code example of how to use Cassandra/DSE for event sourcing and replay

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published