From 1c426e4772c9c924b20ffcbf909e5015bce9390a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 14 May 2012 15:43:58 +0200 Subject: [PATCH 1/2] Improved the docs for dataflow concurrency --- akka-docs/scala/dataflow.rst | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/akka-docs/scala/dataflow.rst b/akka-docs/scala/dataflow.rst index 793b7ebd2f0..5eb0365447e 100644 --- a/akka-docs/scala/dataflow.rst +++ b/akka-docs/scala/dataflow.rst @@ -6,7 +6,7 @@ Description Akka implements `Oz-style dataflow concurrency `_ by using a special API for :ref:`futures-scala` that allows single assignment variables and multiple lightweight (event-based) processes/threads. -Dataflow concurrency is deterministic. This means that it will always behave the same. If you run it once and it yields output 5 then it will do that **every time**, run it 10 million times, same result. If it on the other hand deadlocks the first time you run it, then it will deadlock **every single time** you run it. Also, there is **no difference** between sequential code and concurrent code. These properties makes it very easy to reason about concurrency. The limitation is that the code needs to be side-effect free, e.g. deterministic. You can't use exceptions, time, random etc., but need to treat the part of your program that uses dataflow concurrency as a pure function with input and output. +Dataflow Concurrency has its origin in logic programming and is declarative and fully deterministic. This means that it will always behave the same. If you run it once and it yields output ``5`` then it will do that **every time**, run it 10 million times, same result. If it on the other hand deadlocks the first time you run it, then it will deadlock **every single time** you run it. Also, there is **no difference** between sequential code and concurrent code. These properties makes it very easy to reason about concurrency. The limitation is that the code needs to be side-effect free, e.g. deterministic. You can't use exceptions, time, random etc., but need to treat the part of your program that uses dataflow concurrency as a pure function with input and output. The best way to learn how to program with dataflow variables is to read the fantastic book `Concepts, Techniques, and Models of Computer Programming `_. By Peter Van Roy and Seif Haridi. @@ -26,6 +26,15 @@ Scala's Delimited Continuations plugin is required to use the Dataflow API. To e libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % ) }, scalacOptions += "-P:continuations:enable", +To use the DataFlow library you have to provide an implicit ExecutionContext, here is an example: + +.. code-block:: scala + + import akka.dispatch._ + import java.util.concurrent.Executors + + implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(..)) + Dataflow Variables ------------------ @@ -65,7 +74,7 @@ Dataflow is implemented in Akka using Scala's Delimited Continuations. To use th .. code-block:: scala import Future.flow - implicit val dispatcher = ... + implicit val executionContext = ... val a = Future( ... ) val b = Future( ... ) @@ -82,7 +91,7 @@ The ``flow`` method also returns a ``Future`` for the result of the contained ex .. code-block:: scala import Future.flow - implicit val dispatcher = ... + implicit val executionContext = ... val a = Future( ... ) val b = Future( ... ) @@ -116,7 +125,7 @@ To run these examples: scala> -2. Paste the examples (below) into the Scala REPL. +2. Paste the examples (below) into the Scala REPL. Paste in each of the ``flow`` blocks at a time to see data flow in action. Note: Do not try to run the Oz version, it is only there for reference. 3. Have fun. @@ -144,7 +153,9 @@ Example in Akka: import akka.dispatch._ import Future.flow - implicit val dispatcher = ... + import java.util.concurrent.Executors + + implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4)) val x, y, z = Promise[Int]() @@ -189,7 +200,9 @@ Example in Akka: import akka.dispatch._ import Future.flow - implicit val dispatcher = ... + import java.util.concurrent.Executors + + implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4)) def ints(n: Int, max: Int): List[Int] = { if (n == max) Nil @@ -218,7 +231,9 @@ Example in Akka: import akka.dispatch._ import Future.flow - implicit val dispatcher = ... + import java.util.concurrent.Executors + + implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4)) // create four 'Int' data flow variables val x, y, z, v = Promise[Int]() From 2e248e4b49c8a7a62d723eb96d2da128d4c9f52d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Tue, 15 May 2012 09:19:02 +0200 Subject: [PATCH 2/2] Removed code for tutorial, already removed in docs since duplicate in Typesafe/Akka getting started guide and template --- .gitignore | 4 +- akka-tutorials/akka-tutorial-first/README | 7 - akka-tutorials/akka-tutorial-first/pom.xml | 43 ---- .../project/TutorialBuild.scala | 22 -- .../project/build.properties | 1 - .../java/akka/tutorial/first/java/Pi.java | 197 ------------------ .../src/main/resources/application.conf | 7 - .../scala/akka/tutorial/first/scala/Pi.scala | 110 ---------- .../src/test/scala/WorkerSpec.scala | 31 --- 9 files changed, 1 insertion(+), 421 deletions(-) delete mode 100644 akka-tutorials/akka-tutorial-first/README delete mode 100644 akka-tutorials/akka-tutorial-first/pom.xml delete mode 100644 akka-tutorials/akka-tutorial-first/project/TutorialBuild.scala delete mode 100644 akka-tutorials/akka-tutorial-first/project/build.properties delete mode 100644 akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java delete mode 100644 akka-tutorials/akka-tutorial-first/src/main/resources/application.conf delete mode 100644 akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala delete mode 100644 akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala diff --git a/.gitignore b/.gitignore index 03aba97ee2a..dfeca381661 100755 --- a/.gitignore +++ b/.gitignore @@ -50,8 +50,6 @@ multiverse.log .*.swp akka-docs/_build/ *.pyc -akka-tutorials/akka-tutorial-first/project/boot/ -akka-tutorials/akka-tutorial-first/project/plugins/project/ akka-docs/exts/ _akka_cluster/ Makefile @@ -65,4 +63,4 @@ worker*.log mongoDB/ redis/ beanstalk/ -.scalastyle \ No newline at end of file +.scalastyle diff --git a/akka-tutorials/akka-tutorial-first/README b/akka-tutorials/akka-tutorial-first/README deleted file mode 100644 index f4b42f631fa..00000000000 --- a/akka-tutorials/akka-tutorial-first/README +++ /dev/null @@ -1,7 +0,0 @@ -================ - First Tutorial -================ - -This is the source code for the first tutorial. - -See the Akka Documentation for information about this tutorial. diff --git a/akka-tutorials/akka-tutorial-first/pom.xml b/akka-tutorials/akka-tutorial-first/pom.xml deleted file mode 100644 index 01000a58964..00000000000 --- a/akka-tutorials/akka-tutorial-first/pom.xml +++ /dev/null @@ -1,43 +0,0 @@ - - - 4.0.0 - - akka-tutorial-first-java - akka.tutorial.first.java - akka-tutorial-first-java - jar - 2.1-SNAPSHOT - http://akka.io - - - - com.typesafe.akka - akka-actor - 2.1-SNAPSHOT - - - - - - Akka - Akka Maven2 Repository - http://akka.io/repository/ - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 2.3.2 - - 1.6 - 1.6 - - - - - diff --git a/akka-tutorials/akka-tutorial-first/project/TutorialBuild.scala b/akka-tutorials/akka-tutorial-first/project/TutorialBuild.scala deleted file mode 100644 index d468b69324b..00000000000 --- a/akka-tutorials/akka-tutorial-first/project/TutorialBuild.scala +++ /dev/null @@ -1,22 +0,0 @@ -import sbt._ -import Keys._ - -object TutorialBuild extends Build { - lazy val buildSettings = Seq( - organization := "com.typesafe.akka", - version := "2.1-SNAPSHOT", - scalaVersion := "2.9.1" - ) - - lazy val akka = Project( - id = "akka-tutorial-first", - base = file("."), - settings = Defaults.defaultSettings ++ Seq( - libraryDependencies ++= Seq( - "com.typesafe.akka" % "akka-actor" % "2.1-SNAPSHOT", - "junit" % "junit" % "4.5" % "test", - "org.scalatest" % "scalatest_2.9.0" % "1.6.1" % "test", - "com.typesafe.akka" % "akka-testkit" % "2.1-SNAPSHOT" % "test") - ) - ) -} diff --git a/akka-tutorials/akka-tutorial-first/project/build.properties b/akka-tutorials/akka-tutorial-first/project/build.properties deleted file mode 100644 index c6158f7be41..00000000000 --- a/akka-tutorials/akka-tutorial-first/project/build.properties +++ /dev/null @@ -1 +0,0 @@ -sbt.version=0.11.0 \ No newline at end of file diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java deleted file mode 100644 index 2eaddcd40c3..00000000000 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.tutorial.first.java; - -//#imports - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.UntypedActor; -import akka.actor.UntypedActorFactory; -import akka.routing.RoundRobinRouter; -import akka.util.Duration; -import java.util.concurrent.TimeUnit; - -//#imports - -//#app -public class Pi { - - public static void main(String[] args) { - Pi pi = new Pi(); - pi.calculate(4, 10000, 10000); - } - - //#actors-and-messages - //#messages - static class Calculate { - } - - static class Work { - private final int start; - private final int nrOfElements; - - public Work(int start, int nrOfElements) { - this.start = start; - this.nrOfElements = nrOfElements; - } - - public int getStart() { - return start; - } - - public int getNrOfElements() { - return nrOfElements; - } - } - - static class Result { - private final double value; - - public Result(double value) { - this.value = value; - } - - public double getValue() { - return value; - } - } - - static class PiApproximation { - private final double pi; - private final Duration duration; - - public PiApproximation(double pi, Duration duration) { - this.pi = pi; - this.duration = duration; - } - - public double getPi() { - return pi; - } - - public Duration getDuration() { - return duration; - } - } - - //#messages - - //#worker - public static class Worker extends UntypedActor { - - //#calculatePiFor - private double calculatePiFor(int start, int nrOfElements) { - double acc = 0.0; - for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) { - acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1); - } - return acc; - } - - //#calculatePiFor - - public void onReceive(Object message) { - if (message instanceof Work) { - Work work = (Work) message; - double result = calculatePiFor(work.getStart(), work.getNrOfElements()); - getSender().tell(new Result(result), getSelf()); - } else { - unhandled(message); - } - } - } - - //#worker - - //#master - public static class Master extends UntypedActor { - private final int nrOfMessages; - private final int nrOfElements; - - private double pi; - private int nrOfResults; - private final long start = System.currentTimeMillis(); - - private final ActorRef listener; - private final ActorRef workerRouter; - - public Master(final int nrOfWorkers, int nrOfMessages, int nrOfElements, ActorRef listener) { - this.nrOfMessages = nrOfMessages; - this.nrOfElements = nrOfElements; - this.listener = listener; - - //#create-router - workerRouter = this.getContext().actorOf(new Props(Worker.class).withRouter(new RoundRobinRouter(nrOfWorkers)), - "workerRouter"); - //#create-router - } - - //#master-receive - public void onReceive(Object message) { - //#handle-messages - if (message instanceof Calculate) { - for (int start = 0; start < nrOfMessages; start++) { - workerRouter.tell(new Work(start, nrOfElements), getSelf()); - } - } else if (message instanceof Result) { - Result result = (Result) message; - pi += result.getValue(); - nrOfResults += 1; - if (nrOfResults == nrOfMessages) { - // Send the result to the listener - Duration duration = Duration.create(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS); - listener.tell(new PiApproximation(pi, duration), getSelf()); - // Stops this actor and all its supervised children - getContext().stop(getSelf()); - } - } else { - unhandled(message); - } - //#handle-messages - } - //#master-receive - } - - //#master - - //#result-listener - public static class Listener extends UntypedActor { - public void onReceive(Object message) { - if (message instanceof PiApproximation) { - PiApproximation approximation = (PiApproximation) message; - System.out.println(String.format("\n\tPi approximation: \t\t%s\n\tCalculation time: \t%s", - approximation.getPi(), approximation.getDuration())); - getContext().system().shutdown(); - } else { - unhandled(message); - } - } - } - - //#result-listener - //#actors-and-messages - - public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages) { - // Create an Akka system - ActorSystem system = ActorSystem.create("PiSystem"); - - // create the result listener, which will print the result and shutdown the system - final ActorRef listener = system.actorOf(new Props(Listener.class), "listener"); - - // create the master - ActorRef master = system.actorOf(new Props(new UntypedActorFactory() { - public UntypedActor create() { - return new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener); - } - }), "master"); - - // start the calculation - master.tell(new Calculate()); - - } -} -//#app diff --git a/akka-tutorials/akka-tutorial-first/src/main/resources/application.conf b/akka-tutorials/akka-tutorial-first/src/main/resources/application.conf deleted file mode 100644 index 0a2509357ef..00000000000 --- a/akka-tutorials/akka-tutorial-first/src/main/resources/application.conf +++ /dev/null @@ -1,7 +0,0 @@ -akka.actor.deployment { - /master/workerRouter { - # Uncomment the following two lines to change the calculation to use 10 workers instead of 4: - #router = round-robin - #nr-of-instances = 10 - } -} diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala deleted file mode 100644 index 94fb83bbd31..00000000000 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.tutorial.first.scala - -//#imports -import akka.actor._ -import akka.routing.RoundRobinRouter -import akka.util.Duration -import akka.util.duration._ -//#imports - -//#app -object Pi extends App { - - calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) - - //#actors-and-messages - //#messages - sealed trait PiMessage - case object Calculate extends PiMessage - case class Work(start: Int, nrOfElements: Int) extends PiMessage - case class Result(value: Double) extends PiMessage - case class PiApproximation(pi: Double, duration: Duration) - //#messages - - //#worker - class Worker extends Actor { - - //#calculatePiFor - def calculatePiFor(start: Int, nrOfElements: Int): Double = { - var acc = 0.0 - for (i ← start until (start + nrOfElements)) - acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) - acc - } - //#calculatePiFor - - def receive = { - case Work(start, nrOfElements) ⇒ - sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work - } - } - //#worker - - //#master - class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef) - extends Actor { - - var pi: Double = _ - var nrOfResults: Int = _ - val start: Long = System.currentTimeMillis() - - //#create-router - val workerRouter = context.actorOf( - Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter") - //#create-router - - //#master-receive - def receive = { - //#handle-messages - case Calculate ⇒ - for (i ← 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements) - case Result(value) ⇒ - pi += value - nrOfResults += 1 - if (nrOfResults == nrOfMessages) { - // Send the result to the listener - listener ! PiApproximation(pi, duration = (System.currentTimeMillis() - start).millis) - // Stops this actor and all its supervised children - context.stop(self) - } - //#handle-messages - } - //#master-receive - - } - //#master - - //#result-listener - class Listener extends Actor { - def receive = { - case PiApproximation(pi, duration) ⇒ - println("\n\tPi approximation: \t\t%s\n\tCalculation time: \t%s" - .format(pi, duration)) - context.system.shutdown() - } - } - //#result-listener - - //#actors-and-messages - - def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { - // Create an Akka system - val system = ActorSystem("PiSystem") - - // create the result listener, which will print the result and shutdown the system - val listener = system.actorOf(Props[Listener], name = "listener") - - // create the master - val master = system.actorOf(Props(new Master( - nrOfWorkers, nrOfMessages, nrOfElements, listener)), - name = "master") - - // start the calculation - master ! Calculate - - } -} -//#app diff --git a/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala b/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala deleted file mode 100644 index 86d0715d49d..00000000000 --- a/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.tutorial.first.scala - -import org.junit.runner.RunWith -import org.scalatest.matchers.MustMatchers -import org.scalatest.BeforeAndAfterAll -import org.scalatest.WordSpec -import akka.testkit.TestActorRef -import akka.tutorial.first.scala.Pi.Worker -import akka.actor.ActorSystem - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class WorkerSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { - - implicit val system = ActorSystem() - - override def afterAll { - system.shutdown() - } - - "Worker" must { - "calculate pi correctly" in { - val testActor = TestActorRef[Worker] - val actor = testActor.underlyingActor - actor.calculatePiFor(0, 0) must equal(0.0) - actor.calculatePiFor(1, 1) must be(-1.3333333333333333 plusOrMinus 0.0000000001) - } - } -}