Skip to content

Commit

Permalink
Make possible to add own serializer for Event => T and `T => EventD…
Browse files Browse the repository at this point in the history
…ata`
  • Loading branch information
t3hnar committed Oct 18, 2014
1 parent 2246087 commit 2df8bfa
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 111 deletions.
10 changes: 6 additions & 4 deletions README.md
Expand Up @@ -13,7 +13,7 @@
</tr>
<tr>
<td><a href="https://github.com/EventStore/EventStore.JVM">eventstore-client</a> </td>
<td>1.0.0</td>
<td>1.0.1</td>
</tr>
</table>

Expand All @@ -32,11 +32,13 @@ To configure EventStore.JVM client, see it's [reference.conf](https://github.com

Akka serializes your messages into binary data by default.
However you can [add your own serializer](http://doc.akka.io/docs/akka/2.3.6/scala/serialization.html#Customization) to serialize as JSON,
But make sure to extend `akka.persistence.eventstore.EventStoreSerializer` rather then `akka.serialization.Serializer`
But make sure you extend `akka.persistence.eventstore.EventStoreSerializer` rather then `akka.serialization.Serializer`.
And in case you are really going to serialize as json, please specify `ContentType.Json`, it will allow you to use projections.

```scala
class JsonSerializer extends EventStoreSerializer {
def contentType = ContentType.Json // This will tell Event Store to handle your data as JSON
trait EventStoreSerializer extends Serializer {
def toEvent(o: AnyRef): EventData
def fromEvent(event: Event, manifest: Class[_]): AnyRef
}
```

Expand Down
38 changes: 19 additions & 19 deletions src/main/scala/akka/persistence/eventstore/EventStorePlugin.scala
Expand Up @@ -3,38 +3,38 @@ package akka.persistence.eventstore
import akka.actor.{ ActorLogging, Actor }
import akka.serialization.{ SerializationExtension, Serialization }
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import eventstore.{ ContentType, Content, EsConnection, EventStoreExtension }
import akka.util.ByteString
import scala.util.control.NonFatal
import eventstore._

trait EventStorePlugin extends ActorLogging { self: Actor =>
val connection: EsConnection = EventStoreExtension(context.system).connection
private val serialization: Serialization = SerializationExtension(context.system)
val serialization: Serialization = SerializationExtension(context.system)
import context.dispatcher

def deserialize[T](content: Content, clazz: Class[T]): T = {
serialization.deserialize(content.value.toArray, clazz).get
def deserialize[T](event: Event, clazz: Class[T]): T = {
val ser = serialization.serializerFor(clazz)
val res = ser match {
case ser: EventStoreSerializer => ser.fromEvent(event, clazz)
case _ => ser.fromBinary(event.data.data.value.toArray, clazz)
}
res.asInstanceOf[T]
}

def serialize(x: AnyRef): Content = {
val serializer = serialization.findSerializerFor(x)
val contentType = serializer match {
case x: EventStoreSerializer => x.contentType
case _ => ContentType.Binary
def serialize(data: AnyRef, eventType: => Option[Any] = None): EventData = {
val ser = serialization.findSerializerFor(data)
ser match {
case ser: EventStoreSerializer => ser.toEvent(data)
case _ => EventData(
eventType = (eventType getOrElse data).getClass.getName,

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 29, 2015

Contributor

@t3hnar Is this here because you can set the payload type of PersistentRepr instead of PersistentRepr type?

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 29, 2015

Author Contributor

nope, that's for snapshots to see it's payload rather then internal envelope.

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 29, 2015

Contributor

What I think is interesting is the payload event type when considering built-in projections in the eventstore. Don't you agree?

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 29, 2015

Author Contributor

I totally agree, that's why you should extend EventStoreSerializer and get a full control for eventType.
But why would you need to use eventType of snapshots which will go to the different stream ?

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 29, 2015

Contributor

Snapshots are not interesting with regards to event type, sorry for misunderstanding :)

I intend to use your serializer and my use case is:

Envelope(metadata, domain type) -> Protobuf translation of domain type -> Envelope(metadata, protobuf type) -> store protobuf type and metadata plus use pb type as eventtype in EventData.

Is that doable?

This comment has been minimized.

Copy link
@pawelkaczor

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 29, 2015

Contributor

Briefly, where is the difference and why fork?

This comment has been minimized.

Copy link
@pawelkaczor

pawelkaczor May 29, 2015

Contributor

Please read README

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 29, 2015

Author Contributor

Not sure but looks like changed default behaviour to store akka's metadata and business data in to metadata and data parts for event. Same could be achieved by overriding EventStoreSerializer

This comment has been minimized.

Copy link
@pawelkaczor

pawelkaczor via email May 29, 2015

Contributor

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 29, 2015

Contributor

@t3hnar, do you mean the fork isn't necessary?

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 29, 2015

Author Contributor

if the only reason to separate PersistentRepr data from payload, then YES it is not needed.

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 29, 2015

Author Contributor

with EventStoreSerializer you'll get PersistentRepr -> EventData and EventData -> PersistentRepr, so you can separate payload and metadata without changes to core code of the library.

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 29, 2015

Author Contributor

@pawelkaczor the problem you could face that Serializer bound for class name, and you don't want to use your for example JsonSerializer for PersistentRepr with payload from not your domain (akka sharding uses PersistentRepr as well)
To resolve this issue you just match on payload type and may fall back to default protobuf serializer, etc.

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 29, 2015

Author Contributor

@ahjohannessen here is a screenshot on how I'm using it right now: http://prntscr.com/7ap4iq

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 29, 2015

Contributor

I would prefer not deal with akka types used for persistence and still be able to use my payload type for eventtype + store metadata from my envelope. I suppose I need to experiment in order to identify possible painpoints :)

This comment has been minimized.

Copy link
@pawelkaczor

pawelkaczor via email May 29, 2015

Contributor

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 29, 2015

Author Contributor
{
  "sequenceNr": 7,
  "persistenceId": "GameTable-bfxoiebwyg3ytxcv",
  "deleted": false,
  "payloadClass": "com.xxx.xxx.GameTableActor$Event",
  "payload": {
    "type": "TableLeft",
    "value": {
      "userId": "8xn0vbo1469noeam",
      "kicked": false,
      "timestamp": "2015-05-29T10:37:58.37Z"
    }
  }
}

so it is all together payload and PersistentRepr, but separation can be done on EventStoreSerializer level in PersistentRepr -> EventData and EventData -> PersistentRepr

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 29, 2015

Author Contributor

Overall if that separation feature is everyone want, I can make it to be configurable and put in to the plugin sources...

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 29, 2015

Contributor

Interesting @t3hnar :) thx for screenshot. Would it not make sense to keep akka persistence messages hidden from custom eventstore serializer hook-in methods. Or am I not seeing something obvious?
Main priority is custom serializer + proper eventtype and ability to store my metadata without worrying about akka message types like persistent repr and similar.

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 29, 2015

Author Contributor

@ahjohannessen I don't know business needs of all users, thus EventData -> PersistentRepr & PersistentRepr -> EventData gives the most flexibility. Also PersistentRepr is the API of akka persistence used by other plugins...
So you could end up using same custom serializer with different plugins. Next akka version will provide ability to configure plugin on per actor basis.

This comment has been minimized.

Copy link
@ktoso

ktoso May 29, 2015

Hi guys,
we're in the middle of working on exactly these topics in Akka actually. We agree it is a very important feature and have seen the need for it in a number of plugins, not only in this one. The most relevant tickets are:

If you could have a look and sanity check those features would address your needs that would be awesome.

Esp. we want to allow completely decoupling models - for example if a journal can work with BSON (we have another similar discussion running on a mongodb plugin), then an adapter should just be able to send BSON to the journal. In your case @ahjohannessen you could de-couple the domain classes from the "data representation model" in protobuf, and only send those to the journal.

Please have a look at the issues and let us know what you think!

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 29, 2015

Author Contributor

thx @ktoso
@ahjohannessen so basically first bullet allows you to not bother with PersistentRepr, EventStorePlugin could manage that on own level.

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 29, 2015

Contributor

@ktoso I already do that with scalaz bijections: domain < @ > protobuf :)

This comment has been minimized.

Copy link
@ktoso

ktoso May 29, 2015

@ahjohannessen We're not going to provide the how, but just the where - so you'd still use bijections, just we'd have a proper place to put them.
The 2nd bullet is about providing a common place for those bijections, not about providing a bijections library.
I assume you currently do this in the serialization layer directly (here the 1st bullet should help)?
The 2nd bullet is about a place for writing upcasters (somewhat axon inspired).

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 30, 2015

Contributor

@ktoso Yes, I follow that. Right now, and not in 2.4, I would like to be able to use Akka Persistence without too much pain (hence using the eventstore) and gain what I think is a very common thing.

  • DT <@> PT bijective serializer + SerializerWithStringManifest allows me to store the manifest for PT instead of DT.
  • Ability to use an EventMsg(headers: Map[String, String], payload: DT) and then store its content like this:
    EM ~> EM' ~> (headers, protoPayload) -> EventData(eventType = payloadType, data = protoPayload, metadata = headersJson) and then the other way EventData -> (headers, protoPayload) ~> EM ~> EM' without really giving PersistentRepr and other akka specific things a thought. It should stay out of my way, basically.
    As you see, I do not wish to store the EM structure, it is just a carrier type for payload+context flowing inside my application level code. Nice that ES allows one to store metadata 👍

I assume you currently do this in the serialization layer directly

Yep, that is correct and should help, too bad if I have to wait for 2.4 and need to lift those bits out of akka.

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 30, 2015

Contributor

@t3hnar

Overall if that separation feature is everyone want, I can make it to be configurable and put in to the plugin sources...

Interesting, could you provide some details on how you will approach this? :)

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 30, 2015

Contributor

@t3hnar btw, it would be nice with gitter.im for this project :)

This comment has been minimized.

Copy link
@pawelkaczor

pawelkaczor May 30, 2015

Contributor

I just realized why I had to extend the plugin. The reason was explained in Readme: "to break down EventMessage into event payload and metadata before storing to EventStore journal". When domain event is published, it is wrapped into EventMessage and sent to journal which wraps EM into PersistentRepr. Extended plugin stores DomainEvent (extracted from EM) instead of whole EM as payload (eventType is not set to EventMessage but to appropriate class of DomainEvent) and metadata from EM is stored as EventStore's metadata attribute.

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 30, 2015

Contributor

@pawelkaczor That seems to align very closely to what I wish to accomplish, right?

This comment has been minimized.

Copy link
@pawelkaczor

pawelkaczor May 30, 2015

Contributor

@ahjohannessen wrote:

I intend to use your serializer and my use case is: Envelope(metadata, domain type) -> Protobuf translation of domain type -> Envelope(metadata, protobuf type) -> store protobuf type and metadata plus use pb type as eventtype in EventData.

yes. this is the same use case I solved by extending the plugin. Plugin is the one to break down/reconstruct Envelope into/from metadata and protobuf type.

This comment has been minimized.

Copy link
@ahjohannessen

ahjohannessen May 30, 2015

Contributor

@pawelkaczor Alright, @t3hnar seems to be open for adjustments, so hopefully something can be done such that a fork is not needed and everyone is happy :-)

This comment has been minimized.

Copy link
@pawelkaczor

pawelkaczor May 30, 2015

Contributor

Yes, I think EventMessage/DomainEvent "use-case" should be addressed by the plugin (or sample serializer should be presented, if doable).

This comment has been minimized.

Copy link
@t3hnar

t3hnar May 30, 2015

Author Contributor

@pawelkaczor I still don't understand what stops you from converting PersistentRepr(EventMessage) to EventData with extracted domain message from PersistentRepr... like persistentRepr.payload match { case x: EventMessage => DomainEvent }

Anyway I'm going to look closely to your fork and create example on top of EventStoreSerializer if that's possible, or make it to be possible.

This comment has been minimized.

Copy link
@pawelkaczor

pawelkaczor May 30, 2015

Contributor

Ok, I will give another try. I think you are right...

This comment has been minimized.

Copy link
@pawelkaczor

pawelkaczor May 30, 2015

Contributor

I think I did it! Thanks @t3hnar for your support. I will commit my changes soon.

This comment has been minimized.

data = Content(ser.toBinary(data)))
}
Content(ByteString(serializer.toBinary(x)), contentType)
}

def asyncUnit(x: => Future[_]): Future[Unit] = async(x).map(_ => Unit)

def async[T](x: => Future[T]): Future[T] = Try(x) match {
case Success(s) => s
case Failure(f) => Future.failed(f)
def async[T](x: => Future[T]): Future[T] = try x catch {
case NonFatal(f) => Future.failed(f)
}

def asyncSeq[A](x: => Iterable[Future[A]]): Future[Unit] = asyncUnit(Future.sequence(x))

def logNoClassFoundFor(eventType: String): Unit = {
log.warning("Can't find class to deserialize eventType {}", eventType)
}
}
@@ -1,8 +1,9 @@
package akka.persistence.eventstore

import akka.serialization.Serializer
import eventstore.ContentType
import eventstore.{ Event, EventData }

trait EventStoreSerializer extends Serializer {
def contentType: ContentType
def toEvent(o: AnyRef): EventData
def fromEvent(event: Event, manifest: Class[_]): AnyRef
}
2 changes: 1 addition & 1 deletion src/main/scala/akka/persistence/eventstore/Helpers.scala
Expand Up @@ -40,7 +40,7 @@ object Helpers {
if (self == Long.MaxValue) Int.MaxValue
else {
if (self.isValidInt) self.toInt
else sys.error(s"Can't convert $self to Int")
else sys.error(s"Cannot convert $self to Int")
}
}

Expand Down
Expand Up @@ -19,7 +19,7 @@ class EventStoreJournal extends AsyncWriteJournal with EventStorePlugin {
def asyncWriteMessages(messages: Seq[PersistentRepr]) = asyncSeq {
messages.groupBy(_.persistenceId).map {
case (persistenceId, msgs) =>
val events = msgs.map(eventData)
val events = msgs.map(x => serialize(x, Some(x.payload)))
val expVer = msgs.head.sequenceNr - 1 match {
case 0L => ExpectedVersion.NoStream
case x => ExpectedVersion.Exact(eventNumber(x))
Expand Down Expand Up @@ -57,7 +57,7 @@ class EventStoreJournal extends AsyncWriteJournal with EventStorePlugin {
connection.foldLeft(req, max) {
case (left, event) if event.number <= to && left > 0 =>
val seqNr = sequenceNumber(event.number)
val repr = persistentRepr(event.data).update(deleted = seqNr <= deletedTo)
val repr = deserialize(event, classOf[PersistentRepr]).update(deleted = seqNr <= deletedTo)
replayCallback(repr)
left - 1
}
Expand All @@ -74,16 +74,9 @@ class EventStoreJournal extends AsyncWriteJournal with EventStorePlugin {

def eventStream(x: PersistenceId): EventStream.Plain = EventStream(UrlEncoder(x)) match {
case plain: EventStream.Plain => plain
case other => sys.error(s"can't create plain event stream for $x")
case other => sys.error(s"Cannot create plain event stream for $x")
}

def eventData(x: PersistentRepr): EventData = EventData(
eventType = x.payload.getClass.getSimpleName,
data = serialize(x))

def persistentRepr(x: EventData): PersistentRepr =
deserialize[PersistentRepr](x.data, classOf[PersistentRepr])

def deletedTo(persistenceId: PersistenceId): Future[SequenceNr] = {
deleteToCache.get(persistenceId) match {
case Some(x) => Future.successful(x)
Expand Down
Expand Up @@ -23,15 +23,14 @@ class EventStoreSnapshotStore extends SnapshotStore with EventStorePlugin {
def loadAsync(persistenceId: PersistenceId, criteria: SnapshotSelectionCriteria) = async {
import Selection._
def fold(deletes: Deletes, event: Event): Selection = {
val eventType = event.data.eventType
ClassMap.get(eventType) match {
case None =>
logNoClassFoundFor(eventType)
deletes
deserialize(event, classOf[SnapshotEvent]) match {
case Delete(seqNr, _) => deletes.copy(deleted = deletes.deleted + seqNr)

case Some(SnapshotClass) =>
val metadata = deserialize(event.data.metadata, classOf[SnapshotMetadata])
case DeleteCriteria(maxSeqNr, maxTimestamp) => deletes.copy(
minSequenceNr = math.max(deletes.minSequenceNr, maxSeqNr),
minTimestamp = math.max(deletes.minTimestamp, maxTimestamp))

case Snapshot(snapshot, metadata) =>
val seqNr = metadata.sequenceNr
val timestamp = metadata.timestamp

Expand All @@ -42,31 +41,18 @@ class EventStoreSnapshotStore extends SnapshotStore with EventStorePlugin {
val acceptable = seqNr <= criteria.maxSequenceNr && timestamp <= criteria.maxTimestamp

if (deleted || !acceptable) deletes
else {
val snapshot = deserialize(event.data.data, SnapshotClass)
Selected(SelectedSnapshot(metadata, snapshot.data))
}

case Some(clazz) => deserialize(event.data.data, clazz) match {
case Snapshot(_) => deletes // should not happen
case Delete(seqNr, _) => deletes.copy(deleted = deletes.deleted + seqNr)
case DeleteCriteria(maxSeqNr, maxTimestamp) => deletes.copy(
minSequenceNr = math.max(deletes.minSequenceNr, maxSeqNr),
minTimestamp = math.max(deletes.minTimestamp, maxTimestamp))
}
else Selected(SelectedSnapshot(metadata, snapshot))
}
}

val streamId = eventStream(persistenceId)
val req = ReadStreamEvents(streamId, EventNumber.Last, maxCount = readBatchSize, direction = Backward)
connection.foldLeft(req, Empty) {
case (deletes: Deletes, event) => fold(deletes, event)
}.map(_.selected)
connection.foldLeft(req, Empty) { case (deletes: Deletes, event) => fold(deletes, event) }.map(_.selected)
}

def saveAsync(metadata: SnapshotMetadata, snapshot: Any) = asyncUnit {
val streamId = eventStream(metadata.persistenceId)
connection.future(WriteEvents(streamId, List(eventData(metadata, snapshot))))
connection.future(WriteEvents(streamId, List(serialize(Snapshot(snapshot, metadata), Some(snapshot)))))
}

def saved(metadata: SnapshotMetadata) = {}
Expand All @@ -81,38 +67,21 @@ class EventStoreSnapshotStore extends SnapshotStore with EventStorePlugin {
maxTimestamp = criteria.maxTimestamp))
}

def eventData(metadata: SnapshotMetadata, snapshot: Any): EventData = EventData(
eventType = EventTypeMap(SnapshotClass),
data = serialize(Snapshot(snapshot)),
metadata = serialize(metadata))

def eventData(x: SnapshotEvent): EventData = EventData(
eventType = EventTypeMap(x.getClass),
data = serialize(x))

def eventStream(x: PersistenceId): EventStream.Id = EventStream.Id(UrlEncoder(x) + "-snapshots")

def delete(persistenceId: PersistenceId, se: DeleteEvent): Unit = {
val streamId = eventStream(persistenceId)
val future = connection.future(WriteEvents(streamId, List(eventData(se))))
val future = connection.future(WriteEvents(streamId, List(serialize(se, None))))
Await.result(future, deleteAwait)
}
}

object EventStoreSnapshotStore {
sealed trait SnapshotEvent
sealed trait SnapshotEvent extends Serializable

object SnapshotEvent {
val SnapshotClass: Class[Snapshot] = classOf[Snapshot]
val ClassMap: Map[String, Class[_ <: SnapshotEvent]] = Map(
"snapshot" -> SnapshotClass,
"delete" -> classOf[Delete],
"deleteCriteria" -> classOf[DeleteCriteria])

val EventTypeMap: Map[Class[_ <: SnapshotEvent], String] = ClassMap.map(_.swap)

@SerialVersionUID(0)
case class Snapshot(data: Any) extends SnapshotEvent
@SerialVersionUID(1)
case class Snapshot(data: Any, metadata: SnapshotMetadata) extends SnapshotEvent

sealed trait DeleteEvent extends SnapshotEvent

Expand Down
6 changes: 1 addition & 5 deletions src/test/resources/json4s.conf
Expand Up @@ -3,10 +3,6 @@ include "application"
akka.actor {
serializers.json4s = "akka.persistence.eventstore.Json4sSerializer"
serialization-bindings {
"akka.persistence.eventstore.snapshot.EventStoreSnapshotStore$SnapshotEvent$Snapshot" = json4s
"akka.persistence.eventstore.snapshot.EventStoreSnapshotStore$SnapshotEvent$Delete" = json4s
"akka.persistence.eventstore.snapshot.EventStoreSnapshotStore$SnapshotEvent$DeleteCriteria" = json4s

"akka.persistence.SnapshotMetadata" = json4s
"akka.persistence.eventstore.snapshot.EventStoreSnapshotStore$SnapshotEvent" = json4s
}
}
6 changes: 1 addition & 5 deletions src/test/resources/spray-json.conf
Expand Up @@ -3,10 +3,6 @@ include "application"
akka.actor {
serializers.spray-json = "akka.persistence.eventstore.SprayJsonSerializer"
serialization-bindings {
"akka.persistence.eventstore.snapshot.EventStoreSnapshotStore$SnapshotEvent$Snapshot" = spray-json
"akka.persistence.eventstore.snapshot.EventStoreSnapshotStore$SnapshotEvent$Delete" = spray-json
"akka.persistence.eventstore.snapshot.EventStoreSnapshotStore$SnapshotEvent$DeleteCriteria" = spray-json

"akka.persistence.SnapshotMetadata" = spray-json
"akka.persistence.eventstore.snapshot.EventStoreSnapshotStore$SnapshotEvent" = spray-json
}
}
36 changes: 28 additions & 8 deletions src/test/scala/akka/persistence/eventstore/Json4sSerializer.scala
@@ -1,16 +1,22 @@
package akka.persistence.eventstore

import akka.actor.ExtendedActorSystem
import akka.persistence.SnapshotMetadata
import akka.persistence.eventstore.snapshot.EventStoreSnapshotStore.SnapshotEvent
import akka.persistence.eventstore.snapshot.EventStoreSnapshotStore.SnapshotEvent.Snapshot
import akka.util.ByteString
import org.json4s._
import org.json4s.Extraction.decompose
import org.json4s.native.Serialization.{ read, write }
import java.nio.charset.Charset
import java.nio.ByteBuffer
import snapshot.EventStoreSnapshotStore.SnapshotEvent.SnapshotClass
import eventstore.ContentType
import eventstore.{ Content, EventData, Event, ContentType }

class Json4sSerializer extends EventStoreSerializer {
class Json4sSerializer(val system: ExtendedActorSystem) extends EventStoreSerializer {
import Json4sSerializer._

implicit val formats: Formats = DefaultFormats + SnapshotSerializer

def identifier = Identifier

def includeManifest = true
Expand All @@ -25,23 +31,37 @@ class Json4sSerializer extends EventStoreSerializer {

def toBinary(o: AnyRef) = write(o).getBytes(UTF8)

def contentType = ContentType.Json
def toEvent(x: AnyRef) = x match {
case x: SnapshotEvent => EventData(
eventType = x.getClass.getName,
data = Content(ByteString(toBinary(x)), ContentType.Json))

case _ => sys.error(s"Cannot serialize $x, SnapshotEvent expected")
}

def fromEvent(event: Event, manifest: Class[_]) = {
val clazz = Class.forName(event.data.eventType)
val result = fromBinary(event.data.data.value.toArray, clazz)
if (manifest.isInstance(result)) result
else sys.error(s"Cannot deserialize event as $manifest, event: $event")
}
}

object Json4sSerializer {

val UTF8 = Charset.forName("UTF-8")
val Identifier: Int = ByteBuffer.wrap("json4s".getBytes(UTF8)).getInt
implicit val formats: Formats = DefaultFormats + SnapshotSerializer

object SnapshotSerializer extends Serializer[Snapshot] {
val Clazz = classOf[Snapshot]

def deserialize(implicit format: Formats) = {
case (TypeInfo(SnapshotClass, _), JObject(List(JField("data", JString(x))))) => Snapshot(x)
case (TypeInfo(Clazz, _), JObject(List(
JField("data", JString(x)),
JField("metadata", metadata)))) => Snapshot(x, metadata.extract[SnapshotMetadata])
}

def serialize(implicit format: Formats) = {
case Snapshot(x: String) => JObject("data" -> JString(x))
case Snapshot(data, metadata) => JObject("data" -> JString(data.toString), "metadata" -> decompose(metadata))
}
}
}

0 comments on commit 2df8bfa

Please sign in to comment.