This repository has been archived by the owner on Jun 1, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 103
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Event-sourced writer example application
- Loading branch information
Showing
3 changed files
with
237 additions
and
0 deletions.
There are no files selected for viewing
68 changes: 68 additions & 0 deletions
68
src/test/scala/com/rbmhtechnology/example/querydb/Emitter.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Copyright (C) 2015 Red Bull Media House GmbH <http://www.redbullmediahouse.com> - all rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.rbmhtechnology.example.querydb | ||
|
||
import akka.actor._ | ||
|
||
import com.rbmhtechnology.eventuate._ | ||
|
||
import scala.util._ | ||
|
||
// ----------------- | ||
// Domain commands | ||
// ----------------- | ||
|
||
case class CreateCustomer(first: String, last: String, address: String) | ||
case class UpdateAddress(cid: Long, address: String) | ||
|
||
// --------------- | ||
// Domain events | ||
// --------------- | ||
|
||
case class CustomerCreated(cid: Long, first: String, last: String, address: String) | ||
case class AddressUpdated(cid: Long, address: String) | ||
|
||
// --------------- | ||
// Event emitter | ||
// --------------- | ||
|
||
class Emitter(val id: String, val eventLog: ActorRef) extends EventsourcedActor { | ||
private var highestCustomerId = 0L | ||
|
||
override val onCommand: Receive = { | ||
case CreateCustomer(first, last, address) => | ||
persist(CustomerCreated(highestCustomerId + 1L, first, last, address)) { | ||
case Success(c) => onEvent(c); sender() ! c | ||
case Failure(e) => throw e | ||
} | ||
case UpdateAddress(cid, address) if cid <= highestCustomerId => | ||
persist(AddressUpdated(cid, address)) { | ||
case Success(c) => onEvent(c); sender() ! c | ||
case Failure(e) => throw e | ||
} | ||
case UpdateAddress(cid, _) => | ||
sender() ! new Exception(s"Customer with $cid does not exist") | ||
} | ||
|
||
override val onEvent: Receive = { | ||
case CustomerCreated(cid, first, last, address) => | ||
highestCustomerId = cid | ||
case AddressUpdated(_, _) => | ||
// ... | ||
} | ||
} | ||
|
76 changes: 76 additions & 0 deletions
76
src/test/scala/com/rbmhtechnology/example/querydb/Writer.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Copyright (C) 2015 Red Bull Media House GmbH <http://www.redbullmediahouse.com> - all rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.rbmhtechnology.example.querydb | ||
|
||
import java.lang.{Long => JLong} | ||
|
||
import akka.actor.ActorRef | ||
|
||
import com.datastax.driver.core._ | ||
import com.rbmhtechnology.eventuate.EventsourcedWriter | ||
|
||
import scala.concurrent.Future | ||
|
||
class Writer(val id: String, val eventLog: ActorRef, session: Session) extends EventsourcedWriter[Long, Unit] { | ||
import com.rbmhtechnology.eventuate.log.cassandra.{listenableFutureToFuture => ftr} | ||
import context.dispatcher | ||
|
||
val insertCustomerStmt = session.prepare("INSERT INTO CUSTOMER (id, first, last, address) VALUES (?, ?, ?, ?)") | ||
val updateCustomerStmt = session.prepare("UPDATE CUSTOMER SET address = ? WHERE id = ?") | ||
val updateProgressStmt = session.prepare("UPDATE PROGRESS SET sequence_nr = ? WHERE id = 0") | ||
|
||
var batch: Vector[BoundStatement] = Vector.empty | ||
|
||
override def replayChunkSizeMax: Int = | ||
16 | ||
|
||
override val onCommand: Receive = { | ||
case _ => | ||
} | ||
|
||
override val onEvent: Receive = { | ||
case c @ CustomerCreated(cid, first, last, address) => | ||
batch = batch :+ insertCustomerStmt.bind(cid: JLong, first, last, address) | ||
case u @ AddressUpdated(cid, address) => | ||
batch = batch :+ updateCustomerStmt.bind(address, cid: JLong) | ||
} | ||
|
||
override def write(): Future[Unit] = { | ||
val snr = lastSequenceNr | ||
val res = for { | ||
_ <- Future.sequence(batch.map(stmt => ftr(session.executeAsync(stmt)))) | ||
_ <- session.executeAsync(updateProgressStmt.bind(snr: JLong)) | ||
} yield () | ||
// Clear batch so that further events can | ||
// be processed while write is in progress | ||
batch = Vector.empty | ||
res | ||
} | ||
|
||
override def read(): Future[Long] = { | ||
// Read last processed sequence number from query database | ||
session.executeAsync("SELECT sequence_nr FROM PROGRESS WHERE id = 0").map { rs => | ||
if (rs.isExhausted) 0L else rs.one().getLong(0) | ||
} | ||
} | ||
|
||
override def readSuccess(result: Long): Option[Long] = { | ||
// Resume after the last successfully processed | ||
// position in the source event log | ||
Some(result + 1L) | ||
} | ||
} |
93 changes: 93 additions & 0 deletions
93
src/test/scala/com/rbmhtechnology/example/querydb/WriterApp.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Copyright (C) 2015 Red Bull Media House GmbH <http://www.redbullmediahouse.com> - all rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.rbmhtechnology.example.querydb | ||
|
||
import akka.actor._ | ||
import akka.pattern.ask | ||
import akka.util.Timeout | ||
|
||
import com.datastax.driver.core._ | ||
import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLog | ||
import com.typesafe.config.ConfigFactory | ||
|
||
import scala.concurrent.duration._ | ||
import scala.util._ | ||
|
||
object WriterApp extends App { | ||
val config = | ||
s""" | ||
|eventuate.log.leveldb.dir = target/example-querydb | ||
|eventuate.snapshot.filesystem.dir = target/example-querydb | ||
| | ||
|akka.log-dead-letters = 0 | ||
""".stripMargin | ||
|
||
// --------------------------------------------------------------- | ||
// Assumption: Cassandra 2.1 or higher running on localhost:9042 | ||
// --------------------------------------------------------------- | ||
|
||
withQueryDB(drop = false) { session => | ||
val system = ActorSystem("example-querydb", ConfigFactory.parseString(config)) | ||
val log = system.actorOf(LeveldbEventLog.props("example")) | ||
|
||
val emitter = system.actorOf(Props(new Emitter("emitter", log))) | ||
val writer = system.actorOf(Props(new Writer("writer", log, session))) | ||
|
||
import system.dispatcher | ||
|
||
implicit val timeout = Timeout(5.seconds) | ||
|
||
emitter ! CreateCustomer("Martin", "Krasser", "Somewhere 1") | ||
emitter ? CreateCustomer("Volker", "Stampa", "Somewhere 2") onComplete { | ||
case Success(CustomerCreated(cid, _, _, _)) => emitter ! UpdateAddress(cid, s"Somewhere ${Random.nextInt(10)}") | ||
case Failure(e) => e.printStackTrace() | ||
} | ||
|
||
Thread.sleep(3000) | ||
system.terminate() | ||
} | ||
|
||
|
||
def createQueryDB(drop: Boolean): Session = { | ||
val cluster = Cluster.builder().addContactPoint("localhost").build() | ||
val session = cluster.connect() | ||
|
||
if (drop) { | ||
session.execute("DROP KEYSPACE IF EXISTS QUERYDB") | ||
} | ||
|
||
session.execute("CREATE KEYSPACE IF NOT EXISTS QUERYDB WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }") | ||
session.execute("USE QUERYDB") | ||
|
||
session.execute("CREATE TABLE IF NOT EXISTS CUSTOMER (id bigint, first text, last text, address text, PRIMARY KEY (id))") | ||
session.execute("CREATE TABLE IF NOT EXISTS PROGRESS (id bigint, sequence_nr bigint, PRIMARY KEY (id))") | ||
session.execute("INSERT INTO PROGRESS (id, sequence_nr) VALUES(0, 0) IF NOT EXISTS") | ||
|
||
session | ||
} | ||
|
||
def dropQueryDB(session: Session): Unit = { | ||
session.close() | ||
session.getCluster.close() | ||
} | ||
|
||
def withQueryDB[A](drop: Boolean = true)(f: Session => A): A = { | ||
val session = createQueryDB(drop) | ||
try f(session) finally dropQueryDB(session) | ||
} | ||
} | ||
|