Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Updated the solution with some new features

  • Loading branch information...
commit d105842138aeabfa677910455ac110c2dd6cd4de 1 parent aec8386
@henrikengstrom authored
View
2  processor/src/main/scala/com/typesafe/akkademo/processor/BettingProcessorApplication.scala
@@ -5,7 +5,7 @@ package com.typesafe.akkademo.processor
import akka.actor.{ Props, ActorSystem }
import com.typesafe.config._
-import service.{InitializeProcessor, BettingProcessor}
+import service.{ InitializeProcessor, BettingProcessor }
object BettingProcessorApplication extends App {
val config = ConfigFactory.load()
View
12 processor/src/main/scala/com/typesafe/akkademo/processor/repository/UnstableResource.scala
@@ -18,11 +18,11 @@ class ReallyUnstableResource extends UnstableResource {
val store = new File("persistent_store")
try {
- Source.fromFile(store).getLines().foreach(s deserialize(s).foreach {
- case (id, player, game, amount) if (!bets.contains(id)) bets.put(id, Bet(player, game, amount))
+ Source.fromFile(store).getLines().foreach(s => deserialize(s).foreach {
+ case (id, player, game, amount) => if (!bets.contains(id)) bets.put(id, Bet(player, game, amount))
})
} catch {
- case _
+ case _ =>
}
def save(id: Int, player: String, game: Int, amount: Int) = {
@@ -47,13 +47,13 @@ class ReallyUnstableResource extends UnstableResource {
protected def deserialize(s: String): Option[(Int, String, Int, Int)] = {
s.split(":").toList match {
- case id :: player :: game :: amount :: Nil
+ case id :: player :: game :: amount :: Nil =>
try {
Option((id.toInt, player, game.toInt, amount.toInt))
} catch {
- case _ None
+ case _ => None
}
- case _ None
+ case _ => None
}
}
}
View
12 processor/src/main/scala/com/typesafe/akkademo/processor/service/BettingProcessor.scala
@@ -4,6 +4,7 @@
package com.typesafe.akkademo.processor.service
import akka.actor.{OneForOneStrategy, Props, ActorLogging, Actor}
+import akka.util.duration._
import com.typesafe.akkademo.common.{RegisterProcessor, PlayerBet, RetrieveBets}
import akka.actor.SupervisorStrategy.Restart
import com.typesafe.akkademo.processor.repository.DatabaseFailureException
@@ -13,6 +14,13 @@ case object InitializeProcessor
class BettingProcessor extends Actor with ActorLogging {
val worker = context.actorOf(Props[ProcessorWorker], "theWorker")
+ val service = context.actorFor(context.system.settings.config.getString("betting-service-actor"))
+ val scheduler = context.system.scheduler.schedule(1 seconds, 1 seconds, self, InitializeProcessor)
+
+ override def postStop() {
+ // Prevents the scheduler from being scheduled more than once (in case of restart of this actor)
+ scheduler.cancel()
+ }
override val supervisorStrategy = OneForOneStrategy() {
case r: RuntimeException => Restart
@@ -21,9 +29,7 @@ class BettingProcessor extends Actor with ActorLogging {
}
def receive = {
- case InitializeProcessor =>
- log.info("Processor initializing...")
- context.actorFor(context.system.settings.config.getString("betting-service-actor")) ! RegisterProcessor
+ case InitializeProcessor => service ! RegisterProcessor
case bet: PlayerBet =>
log.info("Storing bet: " + bet)
View
2  project/plugins.sbt
@@ -6,4 +6,4 @@ addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.1.0")
addSbtPlugin("com.typesafe.startscript" % "xsbt-start-script-plugin" % "0.5.3")
-addSbtPlugin("com.typesafe.sbtscalariform" % "sbtscalariform" % "0.4.0")
+addSbtPlugin("com.typesafe.sbtscalariform" % "sbtscalariform" % "0.5.1")
View
32 service/src/main/scala/com/typesafe/akkademo/service/BettingService.scala
@@ -19,8 +19,9 @@ import scala.Some
case object HandleUnprocessedBets
class BettingService extends Actor with ActorLogging {
+ val ActivePeriod = 2000L
val sequence = new AtomicInteger(1)
- var processor: Option[ActorRef] = None
+ var processor: Option[(ActorRef, Long)] = None
// Note: To make this solution (even) more bullet proof you would have to persist the incoming bets.
val bets = scala.collection.mutable.Map[Int, Bet]()
@@ -34,19 +35,29 @@ class BettingService extends Actor with ActorLogging {
}
def receive = {
- case RegisterProcessor
- log.info("processor registered")
- processor = Some(sender)
+ case RegisterProcessor registerProcessor(sender)
case bet: Bet
val playerBet = processBet(bet)
- for (p processor) p ! playerBet
- for (p processor) p ! playerBet
- case RetrieveBets for (p processor) p.forward(RetrieveBets)
+ for (p getActiveProcessor) p ! playerBet
+ for (p getActiveProcessor) p ! playerBet
+ case RetrieveBets for (p getActiveProcessor) p.forward(RetrieveBets)
case ConfirmationMessage(id) handleProcessedBet(id)
case HandleUnprocessedBets handleUnprocessedBets()
// In the upcoming clustering we will be able to listen to remote clients and their status.
// With this it will be possible to prevent sending messages to a client that is no longer available.
// e.g. case RemoteClientDead (or similar) => processor = None
+ // In this solution we use heartbeats instead.
+ }
+
+ def registerProcessor(sender: ActorRef) = {
+ processor = Some((sender, System.currentTimeMillis))
+ }
+
+ def getActiveProcessor: Option[ActorRef] = {
+ processor.flatMap {
+ case (s, t) => if (System.currentTimeMillis - t < ActivePeriod) Some(s) else None
+ case _ => None
+ }
}
def processBet(bet: Bet): PlayerBet = {
@@ -64,10 +75,13 @@ class BettingService extends Actor with ActorLogging {
// In a real world solution you should probably timestamp each message sent so that you do not
// resend just sent messages -> takes some pressure off the processor.
- // Since this is just a demo I'll just treat all messages in the map as unhandled and resend them all.
+ // Since this is just a demo we'll just treat all messages in the map as unhandled and resend them all.
// Please make sure you understand that I can do this since the processor repository is idempotent!
+ // To not flood the processor actor system you might want to use throttling. A good blog post about this van be found here:
+ // http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2
+
log.info("handling unprocessed bets (size): " + bets.size)
- if (processor.isDefined) bets.keys.foreach { k for (p processor) p ! PlayerBet(k, bets(k)) }
+ getActiveProcessor.foreach {p => bets.keys.foreach { k => p ! PlayerBet(k, bets(k))}}
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.