Skip to content

Commit

Permalink
Some improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Mar 22, 2013
1 parent 8437c03 commit 58056f8
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 61 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -13,6 +13,7 @@
.project
.settings
.cache
.target

# sbt specific
dist/*
Expand Down
57 changes: 35 additions & 22 deletions GREEN_BELT_AKKA.md
Expand Up @@ -2,7 +2,7 @@

This doucment will briefly introduce some important parts of Akka _specific for the implementation of this kata_.

The official Akka documentation is really good (if we may say so ourselves): [Akka Docs](http://doc.akka.io/docs/akka/2.0.3/)
The official Akka documentation is really good (if we may say so ourselves): [Akka Docs](http://doc.akka.io/docs/akka/2.1.2/)

_Please note_ that this document describes parts of Akka very briefly and we refer to the original documentation for an in-depth description of Akka.

Expand All @@ -14,7 +14,7 @@ Below is a brief introduction of some concepts you will need for this kata with

An actor system is, among other things, the context in which actors operate. You can have multiple actor systems within the same JVM.

See [Actor Systems](http://doc.akka.io/docs/akka/2.0.3/general/actor-systems.html)
See [Actor Systems](http://doc.akka.io/docs/akka/2.1.2/general/actor-systems.html)

**Creating ActorSystems**

Expand All @@ -24,12 +24,12 @@ val system = ActorSystem("MyActorSystem")

### Working with Actors

See [Actors](http://doc.akka.io/docs/akka/2.0.3/scala/actors.html)
See [Actors](http://doc.akka.io/docs/akka/2.1.2/scala/actors.html)


**Creating actors**

In the system context, called top level actors (to be used sparsely)
In the system context, called top level actors (to be used sparsely). This creates an actor under "/user/myActorName".

```
val myActor = system.actorOf(Props[MyActor], "myActorName")
Expand All @@ -41,6 +41,15 @@ In the actor context, called children (i.e. when you're inside an actor)
val myActor = context.actorOf(Props[MyActor], "myActorName")
```

**Looking up actors**

Look up an actor by name (complete path)

```
val myActor = system.actorFor("/user/myActorName")
val remoteActor = system.actorFor("akka://OtherActorSystem@host:port/user/otherActorName")
```

**Sending messages**

Fire and forget
Expand Down Expand Up @@ -74,9 +83,18 @@ class MyActor extends Actor {
}
```

**Replying**

class MyActor extends Actor {
def receive = {
case "A message" => sender ! "Got it"
}
}
```
**Supervising actors**
See [Fault Tolerance](http://doc.akka.io/docs/akka/2.0.3/scala/fault-tolerance.html)
See [Fault Tolerance](http://doc.akka.io/docs/akka/2.1.2/scala/fault-tolerance.html)
```
override val supervisorStrategy = OneForOneStrategy() {
Expand All @@ -86,30 +104,25 @@ override val supervisorStrategy = OneForOneStrategy() {
### Misc Tasks
**Subscribing to events**
Could be good to use when to find out things about the context you're operating in. All subscribed events will be sent to the actor as a message, i.e. you should handle them in the actor's receive method.
**Scheduling messages**
See [Event Bus](http://doc.akka.io/docs/akka/2.0.3/scala/event-bus.html)
To schedule a message to be sent sometime in the future, once or repeatedly use the scheduler.
```
context.system.eventStream.subscribe(
self,
classOf[RemoteServerClientDisconnected])
// …
See [Scheduler](http://doc.akka.io/docs/akka/2.1.2/scala/scheduler.html)
def receive = {
case r: RemoteServerClientDisconnected => println("Darn!!")
}
```
//Use the system's default dispatcher as ExecutionContext
import system.dispatcher
system.scheduler.schedule(2 seconds, 2 seconds, actor, "every other second message")
```
**Scheduling messages**

To schedule a message send sometime in the future, once or repeatedly use the scheduler.
**Retrieving properties**
See [Scheduler](http://doc.akka.io/docs/akka/2.0.3/scala/scheduler.html)
As you can see in the code there are some properties predefined in the ``application.conf`` file. To retrieve these properties, in the context of an actor, you can use the following:
```
system.scheduler.schedule(2 seconds, 2 seconds, actor, "every other second message")
context.system.settings.config.getString("...")

```
17 changes: 14 additions & 3 deletions README.md
Expand Up @@ -93,19 +93,30 @@ This will create bash scripts in `service/target/start`, `processor/target/start

If you're on Windows without access to a bash shell then you will have to copy the command line and arguments into a script of your own, and rewrite the paths to windows style paths.

**Do not place the scripts in a target directory since they will be deleted if you do:**
**Note that the scripts are by default placed in target directory and they will be deleted if you do:**

```
> sbt clean
```

To start testing your application you can issue the command:
Start the service

```
> service/target/start
```

The next step is to start the processor
```
> processor/target/start
```

Finally you should run the client. Start off by sending bets to the service

```
> client/target/start send
```

And to check the results you issue the command:
The final step is to retrieve the bets from the service

```
> client/target/start
Expand Down
Expand Up @@ -22,7 +22,6 @@ object BetClient extends App {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
netty {
hostname = "127.0.0.1"
port = 2661
Expand All @@ -43,7 +42,8 @@ object BetClient extends App {
} else {
implicit val timeout = Timeout(2 seconds)
val fBets = service.ask(RetrieveBets).mapTo[List[Bet]]
assert(Await.result(fBets, 5 seconds).sorted == bets.sorted)
val result = Await.result(fBets, 5 seconds).sorted
assert(result == bets.sorted, s"expected ${bets.sorted}, got $result")
println("*** TESTING OK")
}
} finally {
Expand Down
17 changes: 8 additions & 9 deletions common/src/main/scala/com/typesafe/akkademo/common/Message.scala
Expand Up @@ -16,15 +16,14 @@ case object RegisterProcessor
object Bet {
implicit object BetOrdering extends Ordering[Bet] {
def compare(a: Bet, b: Bet): Int = {
val p = a.player compare b.player
if (p == 0) {
val g = a.game compare b.game
if (g == 0) {
a.amount compare b.amount
} else
g
} else
p
(a.player compare b.player) match {
case 0
(a.game compare b.game) match {
case 0 a.amount compare b.amount
case g g
}
case p p
}
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions processor/src/main/resources/application.conf
@@ -1,7 +1,4 @@
akka {
event-handlers = ["akka.event.Logging$DefaultLogger"]
loglevel = "INFO"

actor {
provider = "akka.remote.RemoteActorRefProvider"
}
Expand All @@ -11,7 +8,6 @@ akka {
hostname = "127.0.0.1"
port = 2553
}
log-remote-lifecycle-events = on
}
}

Expand Down
Expand Up @@ -8,9 +8,7 @@ import com.typesafe.config._
import service.{ InitializeProcessor, BettingProcessor }

object BettingProcessorApplication extends App {
val config = ConfigFactory.load()

val system = ActorSystem("BettingProcessorActorSystem", config)
val system = ActorSystem("BettingProcessorActorSystem", ConfigFactory.load())

val bettingProcessor = system.actorOf(Props[BettingProcessor], "bettingProcessor")

Expand Down
Expand Up @@ -6,20 +6,21 @@ package com.typesafe.akkademo.processor.repository
import com.typesafe.akkademo.common._
import java.io.{ FileWriter, File }
import scala.io.Source
import scala.concurrent.forkjoin.ThreadLocalRandom

trait UnstableResource {
def save(idempotentId: Int, player: String, game: Int, amount: Int): Unit
def findAll: Seq[Bet]
}

class ReallyUnstableResource extends UnstableResource {
val bets = scala.collection.mutable.Map[Int, Bet]()
val randomizer = new scala.util.Random
val store = new File("persistent_store")
private var bets = Map[Int, Bet]()
private def randomizer = ThreadLocalRandom.current()
private val store = new File("persistent_store")

try {
Source.fromFile(store).getLines().foreach(s deserialize(s).foreach {
case (id, player, game, amount) if (!bets.contains(id)) bets.put(id, Bet(player, game, amount))
case (id, player, game, amount) if (!bets.contains(id)) bets += (id -> Bet(player, game, amount))
})
} catch {
case _: Exception
Expand Down
Expand Up @@ -25,7 +25,7 @@ class BettingProcessor extends Actor with ActorLogging {
override val supervisorStrategy = OneForOneStrategy() {
case r: RuntimeException Restart
case d: DatabaseFailureException Restart
// Read more about fault tolerance here: http://doc.akka.io/docs/akka/2.0.3/scala/fault-tolerance.html
// Read more about fault tolerance here: http://doc.akka.io/docs/akka/2.1.2/scala/fault-tolerance.html
}

def receive = {
Expand All @@ -39,4 +39,4 @@ class BettingProcessor extends Actor with ActorLogging {
log.info("Retrieving all bets")
worker.forward(RetrieveBets)
}
}
}
8 changes: 3 additions & 5 deletions project/Build.scala
Expand Up @@ -58,8 +58,6 @@ object AkkaDemoBuild extends Build {
)

lazy val defaultSettings = Defaults.defaultSettings ++ formatSettings ++ buildSettings ++ Seq(
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",

// compile options
scalacOptions ++= Seq("-encoding", "UTF-8", "-optimise", "-deprecation", "-unchecked"),
javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
Expand Down Expand Up @@ -90,8 +88,8 @@ object Dependencies {
object Dependency {
object Version {
val Akka = "2.1.2"
val Scalatest = "1.6.1"
val JUnit = "4.5"
val Scalatest = "1.9.1"
val JUnit = "4.10"
}

// ---- Application dependencies ----
Expand All @@ -101,6 +99,6 @@ object Dependency {

// ---- Test dependencies ----

val scalaTest = "org.scalatest" % "scalatest_2.9.0" % Version.Scalatest % "test"
val scalaTest = "org.scalatest" %% "scalatest" % Version.Scalatest % "test"
val jUnit = "junit" % "junit" % Version.JUnit % "test"
}
4 changes: 0 additions & 4 deletions service/src/main/resources/application.conf
@@ -1,7 +1,4 @@
akka {
event-handlers = ["akka.event.Logging$DefaultLogger"]
loglevel = "INFO"

actor {
provider = "akka.remote.RemoteActorRefProvider"
}
Expand All @@ -11,6 +8,5 @@ akka {
hostname = "127.0.0.1"
port = 2552
}
log-remote-lifecycle-events = on
}
}
Expand Up @@ -7,9 +7,7 @@ import akka.actor.{ Props, ActorSystem }
import com.typesafe.config._

object BettingServiceApplication extends App {
val config = ConfigFactory.load()

val system = ActorSystem("BettingServiceActorSystem", config)
val system = ActorSystem("BettingServiceActorSystem", ConfigFactory.load())

val service = system.actorOf(Props[BettingService], "bettingService")
}

0 comments on commit 58056f8

Please sign in to comment.