Skip to content
Browse files

Merge pull request #2 from patriknw/wip-improvements1-solution

Some improvements to solution also
  • Loading branch information...
2 parents 8437c03 + 7dd950e commit bb7e63b9cea1d5dea969dc2de833122c3868344d @henrikengstrom committed Mar 22, 2013
View
3 .gitignore
@@ -13,6 +13,7 @@
.project
.settings
.cache
+.target
# sbt specific
dist/*
@@ -24,3 +25,5 @@ project/plugins/project/
# Scala-IDE specific
.scala_dependencies
+
+persistent_store
View
57 GREEN_BELT_AKKA.md
@@ -2,7 +2,7 @@
This doucment will briefly introduce some important parts of Akka _specific for the implementation of this kata_.
-The official Akka documentation is really good (if we may say so ourselves): [Akka Docs](http://doc.akka.io/docs/akka/2.0.3/)
+The official Akka documentation is really good (if we may say so ourselves): [Akka Docs](http://doc.akka.io/docs/akka/2.1.2/)
_Please note_ that this document describes parts of Akka very briefly and we refer to the original documentation for an in-depth description of Akka.
@@ -14,7 +14,7 @@ Below is a brief introduction of some concepts you will need for this kata with
An actor system is, among other things, the context in which actors operate. You can have multiple actor systems within the same JVM.
-See [Actor Systems](http://doc.akka.io/docs/akka/2.0.3/general/actor-systems.html)
+See [Actor Systems](http://doc.akka.io/docs/akka/2.1.2/general/actor-systems.html)
**Creating ActorSystems**
@@ -24,12 +24,12 @@ val system = ActorSystem("MyActorSystem")
### Working with Actors
-See [Actors](http://doc.akka.io/docs/akka/2.0.3/scala/actors.html)
+See [Actors](http://doc.akka.io/docs/akka/2.1.2/scala/actors.html)
**Creating actors**
-In the system context, called top level actors (to be used sparsely)
+In the system context, called top level actors (to be used sparsely). This creates an actor under "/user/myActorName".
```
val myActor = system.actorOf(Props[MyActor], "myActorName")
@@ -41,6 +41,15 @@ In the actor context, called children (i.e. when you're inside an actor)
val myActor = context.actorOf(Props[MyActor], "myActorName")
```
+**Looking up actors**
+
+Look up an actor by name (complete path)
+
+```
+val myActor = system.actorFor("/user/myActorName")
+val remoteActor = system.actorFor("akka://OtherActorSystem@host:port/user/otherActorName")
+```
+
**Sending messages**
Fire and forget
@@ -74,9 +83,18 @@ class MyActor extends Actor {
}
```
+**Replying**
+
+class MyActor extends Actor {
+ def receive = {
+ case "A message" => sender ! "Got it"
+ }
+}
+```
+
**Supervising actors**
-See [Fault Tolerance](http://doc.akka.io/docs/akka/2.0.3/scala/fault-tolerance.html)
+See [Fault Tolerance](http://doc.akka.io/docs/akka/2.1.2/scala/fault-tolerance.html)
```
override val supervisorStrategy = OneForOneStrategy() {
@@ -86,30 +104,25 @@ override val supervisorStrategy = OneForOneStrategy() {
### Misc Tasks
-**Subscribing to events**
-Could be good to use when to find out things about the context you're operating in. All subscribed events will be sent to the actor as a message, i.e. you should handle them in the actor's receive method.
+**Scheduling messages**
-See [Event Bus](http://doc.akka.io/docs/akka/2.0.3/scala/event-bus.html)
+To schedule a message to be sent sometime in the future, once or repeatedly use the scheduler.
-```
-context.system.eventStream.subscribe(
- self,
- classOf[RemoteServerClientDisconnected])
-
-// …
+See [Scheduler](http://doc.akka.io/docs/akka/2.1.2/scala/scheduler.html)
-def receive = {
- case r: RemoteServerClientDisconnected => println("Darn!!")
-}
+```
+//Use the system's default dispatcher as ExecutionContext
+import system.dispatcher
+system.scheduler.schedule(2 seconds, 2 seconds, actor, "every other second message")
```
-**Scheduling messages**
-
-To schedule a message send sometime in the future, once or repeatedly use the scheduler.
+**Retrieving properties**
-See [Scheduler](http://doc.akka.io/docs/akka/2.0.3/scala/scheduler.html)
+As you can see in the code there are some properties predefined in the ``application.conf`` file. To retrieve these properties, in the context of an actor, you can use the following:
```
-system.scheduler.schedule(2 seconds, 2 seconds, actor, "every other second message")
+context.system.settings.config.getString("...")
+
```
+
View
17 README.md
@@ -93,19 +93,30 @@ This will create bash scripts in `service/target/start`, `processor/target/start
If you're on Windows without access to a bash shell then you will have to copy the command line and arguments into a script of your own, and rewrite the paths to windows style paths.
-**Do not place the scripts in a target directory since they will be deleted if you do:**
+**Note that the scripts are by default placed in target directory and they will be deleted if you do:**
```
> sbt clean
```
-To start testing your application you can issue the command:
+Start the service
+
+```
+> service/target/start
+```
+
+The next step is to start the processor
+```
+> processor/target/start
+```
+
+Finally you should run the client. Start off by sending bets to the service
```
> client/target/start send
```
-And to check the results you issue the command:
+The final step is to retrieve the bets from the service
```
> client/target/start
View
6 client/src/main/scala/com/typesafe/akkademo/client/BetClient.scala
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
+ * Copyright (C) 2011-2013 Typesafe <http://typesafe.com/>
*/
package com.typesafe.akkademo.client
@@ -22,7 +22,6 @@ object BetClient extends App {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
- transport = "akka.remote.netty.NettyRemoteTransport"
netty {
hostname = "127.0.0.1"
port = 2661
@@ -43,7 +42,8 @@ object BetClient extends App {
} else {
implicit val timeout = Timeout(2 seconds)
val fBets = service.ask(RetrieveBets).mapTo[List[Bet]]
- assert(Await.result(fBets, 5 seconds).sorted == bets.sorted)
+ val result = Await.result(fBets, 5 seconds).sorted
+ assert(result == bets.sorted, s"expected ${bets.sorted}, got $result")
println("*** TESTING OK")
}
} finally {
View
19 common/src/main/scala/com/typesafe/akkademo/common/Message.scala
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
+ * Copyright (C) 2011-2013 Typesafe <http://typesafe.com/>
*/
package com.typesafe.akkademo.common
@@ -16,15 +16,14 @@ case object RegisterProcessor
object Bet {
implicit object BetOrdering extends Ordering[Bet] {
def compare(a: Bet, b: Bet): Int = {
- val p = a.player compare b.player
- if (p == 0) {
- val g = a.game compare b.game
- if (g == 0) {
- a.amount compare b.amount
- } else
- g
- } else
- p
+ (a.player compare b.player) match {
+ case 0
+ (a.game compare b.game) match {
+ case 0 a.amount compare b.amount
+ case g g
+ }
+ case p p
+ }
}
}
}
View
4 processor/src/main/resources/application.conf
@@ -1,7 +1,4 @@
akka {
- event-handlers = ["akka.event.Logging$DefaultLogger"]
- loglevel = "INFO"
-
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
@@ -11,7 +8,6 @@ akka {
hostname = "127.0.0.1"
port = 2553
}
- log-remote-lifecycle-events = on
}
}
View
6 processor/src/main/scala/com/typesafe/akkademo/processor/BettingProcessorApplication.scala
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
+ * Copyright (C) 2011-2013 Typesafe <http://typesafe.com/>
*/
package com.typesafe.akkademo.processor
@@ -8,9 +8,7 @@ import com.typesafe.config._
import service.{ InitializeProcessor, BettingProcessor }
object BettingProcessorApplication extends App {
- val config = ConfigFactory.load()
-
- val system = ActorSystem("BettingProcessorActorSystem", config)
+ val system = ActorSystem("BettingProcessorActorSystem", ConfigFactory.load())
val bettingProcessor = system.actorOf(Props[BettingProcessor], "bettingProcessor")
View
11 processor/src/main/scala/com/typesafe/akkademo/processor/repository/UnstableResource.scala
@@ -1,25 +1,26 @@
/**
- * Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
+ * Copyright (C) 2011-2013 Typesafe <http://typesafe.com/>
*/
package com.typesafe.akkademo.processor.repository
import com.typesafe.akkademo.common._
import java.io.{ FileWriter, File }
import scala.io.Source
+import scala.concurrent.forkjoin.ThreadLocalRandom
trait UnstableResource {
def save(idempotentId: Int, player: String, game: Int, amount: Int): Unit
def findAll: Seq[Bet]
}
class ReallyUnstableResource extends UnstableResource {
- val bets = scala.collection.mutable.Map[Int, Bet]()
- val randomizer = new scala.util.Random
- val store = new File("persistent_store")
+ private var bets = Map[Int, Bet]()
+ private def randomizer = ThreadLocalRandom.current()
+ private val store = new File("persistent_store")
try {
Source.fromFile(store).getLines().foreach(s deserialize(s).foreach {
- case (id, player, game, amount) if (!bets.contains(id)) bets.put(id, Bet(player, game, amount))
+ case (id, player, game, amount) if (!bets.contains(id)) bets += (id -> Bet(player, game, amount))
})
} catch {
case _: Exception
View
8 processor/src/main/scala/com/typesafe/akkademo/processor/service/BettingProcessor.scala
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
+ * Copyright (C) 2011-2013 Typesafe <http://typesafe.com/>
*/
package com.typesafe.akkademo.processor.service
@@ -8,13 +8,13 @@ import scala.concurrent.duration._
import com.typesafe.akkademo.common.{ RegisterProcessor, PlayerBet, RetrieveBets }
import akka.actor.SupervisorStrategy.Restart
import com.typesafe.akkademo.processor.repository.DatabaseFailureException
-import scala.concurrent.ExecutionContext.Implicits.global
case object InitializeProcessor
class BettingProcessor extends Actor with ActorLogging {
val worker = context.actorOf(Props[ProcessorWorker], "theWorker")
val service = context.actorFor(context.system.settings.config.getString("betting-service-actor"))
+ import context.dispatcher
val scheduler = context.system.scheduler.schedule(1 seconds, 1 seconds, self, InitializeProcessor)
override def postStop() {
@@ -25,7 +25,7 @@ class BettingProcessor extends Actor with ActorLogging {
override val supervisorStrategy = OneForOneStrategy() {
case r: RuntimeException Restart
case d: DatabaseFailureException Restart
- // Read more about fault tolerance here: http://doc.akka.io/docs/akka/2.0.3/scala/fault-tolerance.html
+ // Read more about fault tolerance here: http://doc.akka.io/docs/akka/2.1.2/scala/fault-tolerance.html
}
def receive = {
@@ -39,4 +39,4 @@ class BettingProcessor extends Actor with ActorLogging {
log.info("Retrieving all bets")
worker.forward(RetrieveBets)
}
-}
+}
View
2 processor/src/main/scala/com/typesafe/akkademo/processor/service/ProcessorWorker.scala
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
+ * Copyright (C) 2011-2013 Typesafe <http://typesafe.com/>
*/
package com.typesafe.akkademo.processor.service
View
8 project/Build.scala
@@ -58,8 +58,6 @@ object AkkaDemoBuild extends Build {
)
lazy val defaultSettings = Defaults.defaultSettings ++ formatSettings ++ buildSettings ++ Seq(
- resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",
-
// compile options
scalacOptions ++= Seq("-encoding", "UTF-8", "-optimise", "-deprecation", "-unchecked"),
javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
@@ -90,8 +88,8 @@ object Dependencies {
object Dependency {
object Version {
val Akka = "2.1.2"
- val Scalatest = "1.6.1"
- val JUnit = "4.5"
+ val Scalatest = "1.9.1"
+ val JUnit = "4.10"
}
// ---- Application dependencies ----
@@ -101,6 +99,6 @@ object Dependency {
// ---- Test dependencies ----
- val scalaTest = "org.scalatest" % "scalatest_2.9.0" % Version.Scalatest % "test"
+ val scalaTest = "org.scalatest" %% "scalatest" % Version.Scalatest % "test"
val jUnit = "junit" % "junit" % Version.JUnit % "test"
}
View
4 service/src/main/resources/application.conf
@@ -1,7 +1,4 @@
akka {
- event-handlers = ["akka.event.Logging$DefaultLogger"]
- loglevel = "INFO"
-
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
@@ -11,6 +8,5 @@ akka {
hostname = "127.0.0.1"
port = 2552
}
- log-remote-lifecycle-events = on
}
}
View
12 service/src/main/scala/com/typesafe/akkademo/service/BettingService.scala
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
+ * Copyright (C) 2011-2013 Typesafe <http://typesafe.com/>
*/
package com.typesafe.akkademo.service
@@ -12,7 +12,6 @@ import com.typesafe.akkademo.common._
import com.typesafe.akkademo.common.Bet
import com.typesafe.akkademo.common.ConfirmationMessage
import com.typesafe.akkademo.common.PlayerBet
-import scala.concurrent.ExecutionContext.Implicits.global
case object HandleUnprocessedBets
@@ -22,7 +21,8 @@ class BettingService extends Actor with ActorLogging {
var processor: Option[(ActorRef, Long)] = None
// Note: To make this solution (even) more bullet proof you would have to persist the incoming bets.
- val bets = scala.collection.mutable.Map[Int, Bet]()
+ var bets = Map[Int, Bet]()
+ import context.dispatcher
val scheduler = context.system.scheduler.schedule(2 seconds, 2 seconds, self, HandleUnprocessedBets)
override def postStop() {
@@ -49,8 +49,8 @@ class BettingService extends Actor with ActorLogging {
}
def getActiveProcessor: Option[ActorRef] = {
- processor.flatMap {
- case (s, t) if (System.currentTimeMillis - t < ActivePeriod) Some(s) else None
+ processor.collect {
+ case (p, t) if System.currentTimeMillis - t < ActivePeriod p
}
}
@@ -76,6 +76,6 @@ class BettingService extends Actor with ActorLogging {
// http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2
log.info("handling unprocessed bets (size): " + bets.size)
- getActiveProcessor.foreach { p bets.keys.foreach { k p ! PlayerBet(k, bets(k)) } }
+ getActiveProcessor.foreach { p bets.foreach { case (k, v) p ! PlayerBet(k, v) } }
}
}
View
6 service/src/main/scala/com/typesafe/akkademo/service/BettingServiceApplication.scala
@@ -1,15 +1,13 @@
/**
- * Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
+ * Copyright (C) 2011-2013 Typesafe <http://typesafe.com/>
*/
package com.typesafe.akkademo.service
import akka.actor.{ Props, ActorSystem }
import com.typesafe.config._
object BettingServiceApplication extends App {
- val config = ConfigFactory.load()
-
- val system = ActorSystem("BettingServiceActorSystem", config)
+ val system = ActorSystem("BettingServiceActorSystem", ConfigFactory.load())
val service = system.actorOf(Props[BettingService], "bettingService")
}

0 comments on commit bb7e63b

Please sign in to comment.
Something went wrong with that request. Please try again.