Skip to content
This repository
Browse code

Merge branch 'master' into wip-2006-binary-compat-√

  • Loading branch information...
commit 4fe863a58c43579ed9c56e2f6556bd43f82ab988 2 parents 6d43012 + 2e248e4
Viktor Klang (√) authored May 15, 2012
4  .gitignore
@@ -50,8 +50,6 @@ multiverse.log
50 50
 .*.swp
51 51
 akka-docs/_build/
52 52
 *.pyc
53  
-akka-tutorials/akka-tutorial-first/project/boot/
54  
-akka-tutorials/akka-tutorial-first/project/plugins/project/
55 53
 akka-docs/exts/
56 54
 _akka_cluster/
57 55
 Makefile
@@ -65,4 +63,4 @@ worker*.log
65 63
 mongoDB/
66 64
 redis/
67 65
 beanstalk/
68  
-.scalastyle
  66
+.scalastyle
29  akka-docs/scala/dataflow.rst
Source Rendered
@@ -6,7 +6,7 @@ Description
6 6
 
7 7
 Akka implements `Oz-style dataflow concurrency <http://www.mozart-oz.org/documentation/tutorial/node8.html#chapter.concurrency>`_ by using a special API for :ref:`futures-scala` that allows single assignment variables and multiple lightweight (event-based) processes/threads.
8 8
 
9  
-Dataflow concurrency is deterministic. This means that it will always behave the same. If you run it once and it yields output 5 then it will do that **every time**, run it 10 million times, same result. If it on the other hand deadlocks the first time you run it, then it will deadlock **every single time** you run it. Also, there is **no difference** between sequential code and concurrent code. These properties makes it very easy to reason about concurrency. The limitation is that the code needs to be side-effect free, e.g. deterministic. You can't use exceptions, time, random etc., but need to treat the part of your program that uses dataflow concurrency as a pure function with input and output.
  9
+Dataflow Concurrency has its origin in logic programming and is declarative and fully deterministic. This means that it will always behave the same. If you run it once and it yields output ``5`` then it will do that **every time**, run it 10 million times, same result. If it on the other hand deadlocks the first time you run it, then it will deadlock **every single time** you run it. Also, there is **no difference** between sequential code and concurrent code. These properties makes it very easy to reason about concurrency. The limitation is that the code needs to be side-effect free, e.g. deterministic. You can't use exceptions, time, random etc., but need to treat the part of your program that uses dataflow concurrency as a pure function with input and output.
10 10
 
11 11
 The best way to learn how to program with dataflow variables is to read the fantastic book `Concepts, Techniques, and Models of Computer Programming <http://www.info.ucl.ac.be/%7Epvr/book.html>`_. By Peter Van Roy and Seif Haridi.
12 12
 
@@ -26,6 +26,15 @@ Scala's Delimited Continuations plugin is required to use the Dataflow API. To e
26 26
   libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % <scalaVersion>) },
27 27
   scalacOptions += "-P:continuations:enable",
28 28
 
  29
+To use the DataFlow library you have to provide an implicit ExecutionContext, here is an example:
  30
+
  31
+.. code-block:: scala
  32
+
  33
+  import akka.dispatch._
  34
+  import java.util.concurrent.Executors
  35
+
  36
+  implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(..))
  37
+
29 38
 Dataflow Variables
30 39
 ------------------
31 40
 
@@ -65,7 +74,7 @@ Dataflow is implemented in Akka using Scala's Delimited Continuations. To use th
65 74
 .. code-block:: scala
66 75
 
67 76
   import Future.flow
68  
-  implicit val dispatcher = ...
  77
+  implicit val executionContext = ...
69 78
 
70 79
   val a = Future( ... )
71 80
   val b = Future( ... )
@@ -82,7 +91,7 @@ The ``flow`` method also returns a ``Future`` for the result of the contained ex
82 91
 .. code-block:: scala
83 92
 
84 93
   import Future.flow
85  
-  implicit val dispatcher = ...
  94
+  implicit val executionContext = ...
86 95
 
87 96
   val a = Future( ... )
88 97
   val b = Future( ... )
@@ -116,7 +125,7 @@ To run these examples:
116 125
 
117 126
   scala>
118 127
 
119  
-2. Paste the examples (below) into the Scala REPL.
  128
+2. Paste the examples (below) into the Scala REPL. Paste in each of the ``flow`` blocks at a time to see data flow in action.
120 129
 Note: Do not try to run the Oz version, it is only there for reference.
121 130
 
122 131
 3. Have fun.
@@ -144,7 +153,9 @@ Example in Akka:
144 153
 
145 154
   import akka.dispatch._
146 155
   import Future.flow
147  
-  implicit val dispatcher = ...
  156
+  import java.util.concurrent.Executors
  157
+
  158
+  implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4))
148 159
 
149 160
   val x, y, z = Promise[Int]()
150 161
 
@@ -189,7 +200,9 @@ Example in Akka:
189 200
 
190 201
   import akka.dispatch._
191 202
   import Future.flow
192  
-  implicit val dispatcher = ...
  203
+  import java.util.concurrent.Executors
  204
+
  205
+  implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4))
193 206
 
194 207
   def ints(n: Int, max: Int): List[Int] = {
195 208
     if (n == max) Nil
@@ -218,7 +231,9 @@ Example in Akka:
218 231
 
219 232
   import akka.dispatch._
220 233
   import Future.flow
221  
-  implicit val dispatcher = ...
  234
+  import java.util.concurrent.Executors
  235
+
  236
+  implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4))
222 237
 
223 238
   // create four 'Int' data flow variables
224 239
   val x, y, z, v = Promise[Int]()
7  akka-tutorials/akka-tutorial-first/README
... ...
@@ -1,7 +0,0 @@
1  
-================
2  
- First Tutorial
3  
-================
4  
-
5  
-This is the source code for the first tutorial.
6  
-
7  
-See the Akka Documentation for information about this tutorial.
43  akka-tutorials/akka-tutorial-first/pom.xml
... ...
@@ -1,43 +0,0 @@
1  
-<?xml version="1.0" encoding="UTF-8"?>
2  
-<project xmlns="http://maven.apache.org/POM/4.0.0"
3  
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4  
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5  
-    <modelVersion>4.0.0</modelVersion>
6  
-
7  
-    <name>akka-tutorial-first-java</name>
8  
-    <groupId>akka.tutorial.first.java</groupId>
9  
-    <artifactId>akka-tutorial-first-java</artifactId>
10  
-    <packaging>jar</packaging>
11  
-    <version>2.1-SNAPSHOT</version>
12  
-    <url>http://akka.io</url>
13  
-
14  
-    <dependencies>
15  
-        <dependency>
16  
-            <groupId>com.typesafe.akka</groupId>
17  
-            <artifactId>akka-actor</artifactId>
18  
-            <version>2.1-SNAPSHOT</version>
19  
-        </dependency>
20  
-    </dependencies>
21  
-
22  
-    <repositories>
23  
-        <repository>
24  
-            <id>Akka</id>
25  
-            <name>Akka Maven2 Repository</name>
26  
-            <url>http://akka.io/repository/</url>
27  
-        </repository>
28  
-    </repositories>
29  
-
30  
-    <build>
31  
-        <plugins>
32  
-            <plugin>
33  
-                <groupId>org.apache.maven.plugins</groupId>
34  
-                <artifactId>maven-compiler-plugin</artifactId>
35  
-                <version>2.3.2</version>
36  
-                <configuration>
37  
-                    <source>1.6</source>
38  
-                    <target>1.6</target>
39  
-                </configuration>
40  
-            </plugin>
41  
-        </plugins>
42  
-    </build>
43  
-</project>
22  akka-tutorials/akka-tutorial-first/project/TutorialBuild.scala
... ...
@@ -1,22 +0,0 @@
1  
-import sbt._
2  
-import Keys._
3  
-
4  
-object TutorialBuild extends Build {
5  
-  lazy val buildSettings = Seq(
6  
-    organization := "com.typesafe.akka",
7  
-    version := "2.1-SNAPSHOT",
8  
-    scalaVersion := "2.9.1"
9  
-  )
10  
-
11  
-  lazy val akka = Project(
12  
-    id = "akka-tutorial-first",
13  
-    base = file("."),
14  
-    settings = Defaults.defaultSettings ++ Seq(
15  
-      libraryDependencies ++= Seq(
16  
-        "com.typesafe.akka" % "akka-actor"      % "2.1-SNAPSHOT",
17  
-        "junit"             % "junit"           % "4.5"           % "test",
18  
-        "org.scalatest"     % "scalatest_2.9.0" % "1.6.1"         % "test",
19  
-        "com.typesafe.akka" % "akka-testkit"    % "2.1-SNAPSHOT"  % "test")
20  
-    )
21  
-  )
22  
-}
1  akka-tutorials/akka-tutorial-first/project/build.properties
... ...
@@ -1 +0,0 @@
1  
-sbt.version=0.11.0
197  akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java
... ...
@@ -1,197 +0,0 @@
1  
-/**
2  
- * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
3  
- */
4  
-
5  
-package akka.tutorial.first.java;
6  
-
7  
-//#imports
8  
-
9  
-import akka.actor.ActorRef;
10  
-import akka.actor.ActorSystem;
11  
-import akka.actor.Props;
12  
-import akka.actor.UntypedActor;
13  
-import akka.actor.UntypedActorFactory;
14  
-import akka.routing.RoundRobinRouter;
15  
-import akka.util.Duration;
16  
-import java.util.concurrent.TimeUnit;
17  
-
18  
-//#imports
19  
-
20  
-//#app
21  
-public class Pi {
22  
-
23  
-  public static void main(String[] args) {
24  
-    Pi pi = new Pi();
25  
-    pi.calculate(4, 10000, 10000);
26  
-  }
27  
-
28  
-  //#actors-and-messages
29  
-  //#messages
30  
-  static class Calculate {
31  
-  }
32  
-
33  
-  static class Work {
34  
-    private final int start;
35  
-    private final int nrOfElements;
36  
-
37  
-    public Work(int start, int nrOfElements) {
38  
-      this.start = start;
39  
-      this.nrOfElements = nrOfElements;
40  
-    }
41  
-
42  
-    public int getStart() {
43  
-      return start;
44  
-    }
45  
-
46  
-    public int getNrOfElements() {
47  
-      return nrOfElements;
48  
-    }
49  
-  }
50  
-
51  
-  static class Result {
52  
-    private final double value;
53  
-
54  
-    public Result(double value) {
55  
-      this.value = value;
56  
-    }
57  
-
58  
-    public double getValue() {
59  
-      return value;
60  
-    }
61  
-  }
62  
-
63  
-  static class PiApproximation {
64  
-    private final double pi;
65  
-    private final Duration duration;
66  
-
67  
-    public PiApproximation(double pi, Duration duration) {
68  
-      this.pi = pi;
69  
-      this.duration = duration;
70  
-    }
71  
-
72  
-    public double getPi() {
73  
-      return pi;
74  
-    }
75  
-
76  
-    public Duration getDuration() {
77  
-      return duration;
78  
-    }
79  
-  }
80  
-
81  
-  //#messages
82  
-
83  
-  //#worker
84  
-  public static class Worker extends UntypedActor {
85  
-
86  
-    //#calculatePiFor
87  
-    private double calculatePiFor(int start, int nrOfElements) {
88  
-      double acc = 0.0;
89  
-      for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) {
90  
-        acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
91  
-      }
92  
-      return acc;
93  
-    }
94  
-
95  
-    //#calculatePiFor
96  
-
97  
-    public void onReceive(Object message) {
98  
-      if (message instanceof Work) {
99  
-        Work work = (Work) message;
100  
-        double result = calculatePiFor(work.getStart(), work.getNrOfElements());
101  
-        getSender().tell(new Result(result), getSelf());
102  
-      } else {
103  
-        unhandled(message);
104  
-      }
105  
-    }
106  
-  }
107  
-
108  
-  //#worker
109  
-
110  
-  //#master
111  
-  public static class Master extends UntypedActor {
112  
-    private final int nrOfMessages;
113  
-    private final int nrOfElements;
114  
-
115  
-    private double pi;
116  
-    private int nrOfResults;
117  
-    private final long start = System.currentTimeMillis();
118  
-
119  
-    private final ActorRef listener;
120  
-    private final ActorRef workerRouter;
121  
-
122  
-    public Master(final int nrOfWorkers, int nrOfMessages, int nrOfElements, ActorRef listener) {
123  
-      this.nrOfMessages = nrOfMessages;
124  
-      this.nrOfElements = nrOfElements;
125  
-      this.listener = listener;
126  
-
127  
-      //#create-router
128  
-      workerRouter = this.getContext().actorOf(new Props(Worker.class).withRouter(new RoundRobinRouter(nrOfWorkers)),
129  
-          "workerRouter");
130  
-      //#create-router
131  
-    }
132  
-
133  
-    //#master-receive
134  
-    public void onReceive(Object message) {
135  
-      //#handle-messages
136  
-      if (message instanceof Calculate) {
137  
-        for (int start = 0; start < nrOfMessages; start++) {
138  
-          workerRouter.tell(new Work(start, nrOfElements), getSelf());
139  
-        }
140  
-      } else if (message instanceof Result) {
141  
-        Result result = (Result) message;
142  
-        pi += result.getValue();
143  
-        nrOfResults += 1;
144  
-        if (nrOfResults == nrOfMessages) {
145  
-          // Send the result to the listener
146  
-          Duration duration = Duration.create(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
147  
-          listener.tell(new PiApproximation(pi, duration), getSelf());
148  
-          // Stops this actor and all its supervised children
149  
-          getContext().stop(getSelf());
150  
-        }
151  
-      } else {
152  
-        unhandled(message);
153  
-      }
154  
-      //#handle-messages
155  
-    }
156  
-    //#master-receive
157  
-  }
158  
-
159  
-  //#master
160  
-
161  
-  //#result-listener
162  
-  public static class Listener extends UntypedActor {
163  
-    public void onReceive(Object message) {
164  
-      if (message instanceof PiApproximation) {
165  
-        PiApproximation approximation = (PiApproximation) message;
166  
-        System.out.println(String.format("\n\tPi approximation: \t\t%s\n\tCalculation time: \t%s",
167  
-            approximation.getPi(), approximation.getDuration()));
168  
-        getContext().system().shutdown();
169  
-      } else {
170  
-        unhandled(message);
171  
-      }
172  
-    }
173  
-  }
174  
-
175  
-  //#result-listener
176  
-  //#actors-and-messages
177  
-
178  
-  public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages) {
179  
-    // Create an Akka system
180  
-    ActorSystem system = ActorSystem.create("PiSystem");
181  
-
182  
-    // create the result listener, which will print the result and shutdown the system
183  
-    final ActorRef listener = system.actorOf(new Props(Listener.class), "listener");
184  
-
185  
-    // create the master
186  
-    ActorRef master = system.actorOf(new Props(new UntypedActorFactory() {
187  
-      public UntypedActor create() {
188  
-        return new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener);
189  
-      }
190  
-    }), "master");
191  
-
192  
-    // start the calculation
193  
-    master.tell(new Calculate());
194  
-
195  
-  }
196  
-}
197  
-//#app
7  akka-tutorials/akka-tutorial-first/src/main/resources/application.conf
... ...
@@ -1,7 +0,0 @@
1  
-akka.actor.deployment {
2  
-  /master/workerRouter {
3  
-    # Uncomment the following two lines to change the calculation to use 10 workers instead of 4:
4  
-    #router = round-robin
5  
-    #nr-of-instances = 10
6  
-  }
7  
-}
110  akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala
... ...
@@ -1,110 +0,0 @@
1  
-/**
2  
- * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
3  
- */
4  
-package akka.tutorial.first.scala
5  
-
6  
-//#imports
7  
-import akka.actor._
8  
-import akka.routing.RoundRobinRouter
9  
-import akka.util.Duration
10  
-import akka.util.duration._
11  
-//#imports
12  
-
13  
-//#app
14  
-object Pi extends App {
15  
-
16  
-  calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
17  
-
18  
-  //#actors-and-messages
19  
-  //#messages
20  
-  sealed trait PiMessage
21  
-  case object Calculate extends PiMessage
22  
-  case class Work(start: Int, nrOfElements: Int) extends PiMessage
23  
-  case class Result(value: Double) extends PiMessage
24  
-  case class PiApproximation(pi: Double, duration: Duration)
25  
-  //#messages
26  
-
27  
-  //#worker
28  
-  class Worker extends Actor {
29  
-
30  
-    //#calculatePiFor
31  
-    def calculatePiFor(start: Int, nrOfElements: Int): Double = {
32  
-      var acc = 0.0
33  
-      for (i ← start until (start + nrOfElements))
34  
-        acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
35  
-      acc
36  
-    }
37  
-    //#calculatePiFor
38  
-
39  
-    def receive = {
40  
-      case Work(start, nrOfElements) ⇒
41  
-        sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
42  
-    }
43  
-  }
44  
-  //#worker
45  
-
46  
-  //#master
47  
-  class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef)
48  
-    extends Actor {
49  
-
50  
-    var pi: Double = _
51  
-    var nrOfResults: Int = _
52  
-    val start: Long = System.currentTimeMillis()
53  
-
54  
-    //#create-router
55  
-    val workerRouter = context.actorOf(
56  
-      Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
57  
-    //#create-router
58  
-
59  
-    //#master-receive
60  
-    def receive = {
61  
-      //#handle-messages
62  
-      case Calculate ⇒
63  
-        for (i ← 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements)
64  
-      case Result(value) ⇒
65  
-        pi += value
66  
-        nrOfResults += 1
67  
-        if (nrOfResults == nrOfMessages) {
68  
-          // Send the result to the listener
69  
-          listener ! PiApproximation(pi, duration = (System.currentTimeMillis() - start).millis)
70  
-          // Stops this actor and all its supervised children
71  
-          context.stop(self)
72  
-        }
73  
-      //#handle-messages
74  
-    }
75  
-    //#master-receive
76  
-
77  
-  }
78  
-  //#master
79  
-
80  
-  //#result-listener
81  
-  class Listener extends Actor {
82  
-    def receive = {
83  
-      case PiApproximation(pi, duration) ⇒
84  
-        println("\n\tPi approximation: \t\t%s\n\tCalculation time: \t%s"
85  
-          .format(pi, duration))
86  
-        context.system.shutdown()
87  
-    }
88  
-  }
89  
-  //#result-listener
90  
-
91  
-  //#actors-and-messages
92  
-
93  
-  def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
94  
-    // Create an Akka system
95  
-    val system = ActorSystem("PiSystem")
96  
-
97  
-    // create the result listener, which will print the result and shutdown the system
98  
-    val listener = system.actorOf(Props[Listener], name = "listener")
99  
-
100  
-    // create the master
101  
-    val master = system.actorOf(Props(new Master(
102  
-      nrOfWorkers, nrOfMessages, nrOfElements, listener)),
103  
-      name = "master")
104  
-
105  
-    // start the calculation
106  
-    master ! Calculate
107  
-
108  
-  }
109  
-}
110  
-//#app
31  akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala
... ...
@@ -1,31 +0,0 @@
1  
-/**
2  
- * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
3  
- */
4  
-package akka.tutorial.first.scala
5  
-
6  
-import org.junit.runner.RunWith
7  
-import org.scalatest.matchers.MustMatchers
8  
-import org.scalatest.BeforeAndAfterAll
9  
-import org.scalatest.WordSpec
10  
-import akka.testkit.TestActorRef
11  
-import akka.tutorial.first.scala.Pi.Worker
12  
-import akka.actor.ActorSystem
13  
-
14  
-@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
15  
-class WorkerSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
16  
-
17  
-  implicit val system = ActorSystem()
18  
-
19  
-  override def afterAll {
20  
-    system.shutdown()
21  
-  }
22  
-
23  
-  "Worker" must {
24  
-    "calculate pi correctly" in {
25  
-      val testActor = TestActorRef[Worker]
26  
-      val actor = testActor.underlyingActor
27  
-      actor.calculatePiFor(0, 0) must equal(0.0)
28  
-      actor.calculatePiFor(1, 1) must be(-1.3333333333333333 plusOrMinus 0.0000000001)
29  
-    }
30  
-  }
31  
-}

0 notes on commit 4fe863a

Please sign in to comment.
Something went wrong with that request. Please try again.