Permalink
Browse files

implemented full Redis based event logging in addition to in-memory l…

…ogging. Snapshotting works for both forms of logging. Test cases pass
  • Loading branch information...
1 parent 368fbde commit f269bfd108f4aeed9477b88d783331965f6c0db9 @debasishg committed Jan 13, 2012
@@ -1,7 +1,8 @@
package net.debasishg.domain.trade
package event
-import util.Serialization
+import serialization.Serialization._
+import serialization.Util._
import akka.dispatch._
import akka.util.Timeout
import akka.util.duration._
@@ -36,29 +37,33 @@ class RedisEventLog(clients: RedisClientPool, as: ActorSystem) extends EventLog
case class GetEntries()
class Logger(clients: RedisClientPool) extends Actor {
+ implicit val format = Format {case l: EventLogEntry => serializeEventLogEntry(l)}
+ implicit val parseList = Parse[EventLogEntry](deSerializeEventLogEntry(_))
+
def receive = {
case LogEvent(id, state, data, event) =>
val entry = EventLogEntry(RedisEventLog.nextId(), id, state, data, event)
clients.withClient {client =>
- client.lpush(RedisEventLog.logName, Serialization.serialize(entry))
+ client.lpush(RedisEventLog.logName, entry)
}
sender ! entry
case GetEntries() =>
import Parse.Implicits.parseByteArray
val entries =
clients.withClient {client =>
- client.lrange[Array[Byte]](RedisEventLog.logName, 0, -1)
+ client.lrange[EventLogEntry](RedisEventLog.logName, 0, -1)
}
- val ren = entries.map(_.map(e => Serialization.deserialize(e.get))).getOrElse(List.empty[EventLogEntry]).reverse
- println("**************************")
- ren.foreach(println)
- println("**************************")
+ val ren = entries.map(_.map(e => e.get)).getOrElse(List.empty[EventLogEntry]).reverse
sender ! ren
}
}
}
+ import Parse.Implicits.parseDouble
+
+
+
object RedisEventLog {
var current = 0L
def logName = "events"
@@ -1,12 +1,5 @@
-package net.debasishg.domain.trade.model
-
-/**
- * Created by IntelliJ IDEA.
- * User: debasish
- * Date: 24/12/10
- * Time: 10:37 PM
- * To change this template use File | Settings | File Templates.
- */
+package net.debasishg.domain.trade
+package model
trait RefModel extends Serializable {
type Instrument = String
@@ -1,23 +0,0 @@
-package net.debasishg.domain.trade
-package util
-
-import java.io._
-
-object Serialization {
- def serialize(obj: AnyRef) = {
- val bos = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(bos)
-
- oos.writeObject(obj)
- oos.flush()
- bos.toByteArray
- }
-
- def deserialize(bytes: Array[Byte]) = {
- val bis = new ByteArrayInputStream(bytes)
- val ois = new ObjectInputStream(bis)
-
- ois.readObject()
- }
-}
-
@@ -0,0 +1,157 @@
+package net.debasishg.domain.trade
+package serialization
+
+import java.util.Date
+import sjson.json.Format
+import sjson.json.DefaultProtocol._
+import dispatch.json._
+import sjson.json.JsonSerialization._
+
+import event.{Event, State, EventLogEntry}
+import model.TradeModel._
+
+object Serialization {
+ implicit val EventFormat: Format[Event] = new Format[Event] {
+ def reads(json: JsValue): Event = json match {
+ case JsString("NewTrade") => NewTrade
+ case JsString("EnrichTrade") => EnrichTrade
+ case JsString("AddValueDate") => AddValueDate
+ case JsString("SendOutContractNote") => SendOutContractNote
+ case _ => sys.error("Invalid Event")
+ }
+ def writes(a: Event): JsValue = a match {
+ case NewTrade => JsString("NewTrade")
+ case EnrichTrade => JsString("EnrichTrade")
+ case AddValueDate => JsString("AddValueDate")
+ case SendOutContractNote => JsString("SendOutContractNote")
+ }
+ }
+
+ implicit val StateFormat: Format[State] = new Format[State] {
+ def reads(json: JsValue): State = json match {
+ case JsString("Created") => Created
+ case JsString("Enriched") => Enriched
+ case JsString("ValueDateAdded") => ValueDateAdded
+ case _ => sys.error("Invalid State")
+ }
+ def writes(a: State): JsValue = a match {
+ case Created => JsString("Created")
+ case Enriched => JsString("Enriched")
+ case ValueDateAdded => JsString("ValueDateAdded")
+ }
+ }
+
+ implicit val TaxFeeIdFormat: Format[TaxFeeId] = new Format[TaxFeeId] {
+ def reads(json: JsValue): TaxFeeId = json match {
+ case JsString("TradeTax") => TradeTax
+ case JsString("Commission") => Commission
+ case JsString("VAT") => VAT
+ case _ => sys.error("Invalid TaxFeeId")
+ }
+ def writes(a: TaxFeeId): JsValue = a match {
+ case TradeTax => JsString("TradeTax")
+ case Commission => JsString("Commission")
+ case VAT => JsString("VAT")
+ }
+ }
+
+ implicit val MarketFormat: Format[Market] = new Format[Market] {
+ def reads(json: JsValue): Market = json match {
+ case JsString("HongKong") => HongKong
+ case JsString("Singapore") => Singapore
+ case JsString("NewYork") => NewYork
+ case JsString("Tokyo") => Tokyo
+ case JsString("Other") => Other
+ case _ => sys.error("Invalid State")
+ }
+ def writes(a: Market): JsValue = a match {
+ case HongKong => JsString("HongKong")
+ case Singapore => JsString("Singapore")
+ case NewYork => JsString("NewYork")
+ case Tokyo => JsString("Tokyo")
+ case Other => JsString("Other")
+ }
+ }
+
+ implicit object BigDecimalFormat extends Format[BigDecimal] {
+ def writes(o: BigDecimal) = JsValue.apply(o)
+ def reads(json: JsValue) = json match {
+ case JsNumber(n) => n
+ case _ => throw new RuntimeException("BigDecimal expected")
+ }
+ }
+
+ implicit object DateFormat extends Format[Date] {
+ def writes(o: Date) = JsValue.apply(o.getTime.toString)
+ def reads(json: JsValue) = json match {
+ case JsString(s) => sjson.json.Util.mkDate(s)
+ case _ => throw new RuntimeException("Date expected")
+ }
+ }
+
+ implicit val TradeFormat: Format[Trade] = new Format[Trade] {
+ def writes(t: Trade): JsValue =
+ JsObject(List(
+ (tojson("account").asInstanceOf[JsString], tojson(t.account)),
+ (tojson("instrument").asInstanceOf[JsString], tojson(t.instrument)),
+ (tojson("refNo").asInstanceOf[JsString], tojson(t.refNo)),
+ (tojson("market").asInstanceOf[JsString], tojson(t.market)),
+ (tojson("unitPrice").asInstanceOf[JsString], tojson(t.unitPrice)),
+ (tojson("quantity").asInstanceOf[JsString], tojson(t.quantity)),
+ (tojson("tradeDate").asInstanceOf[JsString], tojson(t.tradeDate)),
+ (tojson("valueDate").asInstanceOf[JsString], tojson(t.valueDate)),
+ (tojson("taxFees").asInstanceOf[JsString], tojson(t.taxFees)),
+ (tojson("netAmount").asInstanceOf[JsString], tojson(t.netAmount)) ))
+
+ def reads(json: JsValue): Trade = json match {
+ case JsObject(m) =>
+ Trade(fromjson[Account](m(JsString("account"))),
+ fromjson[Instrument](m(JsString("instrument"))),
+ fromjson[String](m(JsString("refNo"))),
+ fromjson[Market](m(JsString("market"))),
+ fromjson[BigDecimal](m(JsString("unitPrice"))),
+ fromjson[BigDecimal](m(JsString("quantity"))),
+ fromjson[Date](m(JsString("tradeDate"))),
+ fromjson[Option[Date]](m(JsString("valueDate"))),
+ fromjson[Option[List[(TaxFeeId, BigDecimal)]]](m(JsString("taxFees"))),
+ fromjson[Option[BigDecimal]](m(JsString("netAmount"))))
+ case _ => throw new RuntimeException("JsObject expected")
+ }
+ }
+
+ implicit val EventLogEntryFormat: Format[EventLogEntry] = new Format[EventLogEntry] {
+ def writes(e: EventLogEntry): JsValue =
+ JsObject(List(
+ (tojson("entryId").asInstanceOf[JsString], tojson(e.entryId)),
+ (tojson("objectId").asInstanceOf[JsString], tojson(e.objectId)),
+ (tojson("inState").asInstanceOf[JsString], tojson(e.inState)),
+ e.withData match {
+ case Some(t) => t match {
+ case trd: Trade => (tojson("withData").asInstanceOf[JsString], tojson(trd))
+ case _ => sys.error("invalid trade data")
+ }
+ case _ => (tojson("withData").asInstanceOf[JsString], tojson("$notrade$"))
+ },
+ (tojson("event").asInstanceOf[JsString], tojson(e.event)) ))
+
+ def reads(json: JsValue): EventLogEntry = json match {
+ case JsObject(m) =>
+ EventLogEntry(fromjson[Long](m(JsString("entryId"))),
+ fromjson[String](m(JsString("objectId"))),
+ fromjson[State](m(JsString("inState"))),
+ m(JsString("withData")) match {
+ case JsString("$notrade$") => None
+ case t => Some(fromjson[Trade](t))
+ },
+ fromjson[Event](m(JsString("event"))))
+ case _ => throw new RuntimeException("JsObject expected")
+ }
+ }
+}
+
+object Util {
+ def serializeEventLogEntry(e: EventLogEntry)(implicit f: Format[EventLogEntry])
+ = tobinary(e)
+ def deSerializeEventLogEntry(bytes: Array[Byte])(implicit f: Format[EventLogEntry])
+ = frombinary[EventLogEntry](bytes)
+}
@@ -13,13 +13,11 @@ trait TradeSnapshot {
val l = new collection.mutable.ListBuffer[Trade]
var mar = Map.empty[String, ActorRef]
log.foreach {entry =>
- println("entry = " + entry)
val EventLogEntry(id, oid, state, d, ev) = entry
- println("state = " + state)
- if (state == Created) {
+ if (state.toString == "Created") {
mar += ((oid, system.actorOf(Props(new TradeLifecycle(d.asInstanceOf[Option[Trade]].get, timeout.duration, None)), name = "tlc-" + oid)))
mar(oid) ! ev
- } else if (state == Enriched) {
+ } else if (state.toString == "Enriched") {
val future = mar(oid) ? SendOutContractNote
l += Await.result(future, timeout.duration).asInstanceOf[Trade]
} else {
@@ -26,7 +26,6 @@ class TradeLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAfterAll
override def afterAll = { system.shutdown() }
describe("trade lifecycle") {
-/**
it("should work with in memory event logging") {
val log = new InMemoryEventLog(system)
val finalTrades = new collection.mutable.ListBuffer[Trade]
@@ -63,7 +62,6 @@ class TradeLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAfterAll
val qtrades = Await.result(f, timeout.duration).asInstanceOf[List[Trade]]
qtrades should equal(finalTrades)
}
-**/
it("should work with redis based event logging") {
val clients = new RedisClientPool("localhost", 6379)

0 comments on commit f269bfd

Please sign in to comment.