Skip to content

Commit

Permalink
Initial checkin
Browse files Browse the repository at this point in the history
  • Loading branch information
Alceste Scalas committed May 3, 2016
1 parent 02c92ec commit cf220eb
Show file tree
Hide file tree
Showing 32 changed files with 5,341 additions and 0 deletions.
605 changes: 605 additions & 0 deletions benchmarks/src/main/scala/Chameneos.scala

Large diffs are not rendered by default.

437 changes: 437 additions & 0 deletions benchmarks/src/main/scala/PingPong.scala

Large diffs are not rendered by default.

581 changes: 581 additions & 0 deletions benchmarks/src/main/scala/Ring.scala

Large diffs are not rendered by default.

107 changes: 107 additions & 0 deletions benchmarks/src/main/scala/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// lchannels - session programming in Scala
// Copyright (c) 2016, Alceste Scalas and Imperial College London
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
/** @author Alceste Scalas <alceste.scalas@imperial.ac.uk> */
package lchannels

package object benchmarks {
private val defaultMsgCount = 25600

case class BenchmarkResult(title: String, data: Iterator[Long])
type BenchmarkResults = List[BenchmarkResult]

def main(args: Array[String]): Unit = {
val savePath = if (args.length < 1) {
throw new IllegalArgumentException("You must provide the path for saving benchmark results")
} else {
args(0)
}

val msgCount = if (args.length >= 2) args(1).toInt else defaultMsgCount

run(savePath, msgCount)
}

def run(savePath: String, msgCount: Int) {
import java.nio.file.Paths

case class Benchmark(title: String, benchf: () => BenchmarkResults,
filename: String)

val repetitions = 30 // 30 should be the minimum for one JVM invocation

println(f"Starting benchmarks: ${msgCount} minimum message transmissions, ${repetitions} repetitions")

val benchmarks = List(
Benchmark(title = f"Ping-pong (${msgCount} message exchanges)",
benchf = () => pingpong.Benchmark(msgCount*2, repetitions, "Token"),
filename = "pingpong.csv"),
Benchmark(title = f"Ring (${1000} processes, ${msgCount/1000} loops)",
benchf = () => ring.Benchmark(ring.Benchmark.Standard(),
msgCount, 1000, repetitions, "Token"),
filename = "ring.csv"),
Benchmark(title = f"Streaming (${16} processes, ${msgCount*3/2} msgs sent/recvd)",
benchf = () => ring.Benchmark(ring.Benchmark.Streaming(),
msgCount*3/2, 16, repetitions, "Token"),
filename = "ring-stream.csv"),
Benchmark(title = f"Chameneos (${256} chameneos, ${(msgCount*6) / (3 * 2)} meetings)",
benchf = () => chameneos.Benchmark(msgCount*6, 256, repetitions),
filename = "chameneos.csv")
)

for (b <- benchmarks) {
val fname = Paths.get(savePath, b.filename).toString()
runAndSave(b.title, b.benchf, fname)
println(f" Results saved in: ${fname}")
}
}

private def runAndSave(title: String, benchf: () => BenchmarkResults,
filename: String) : Unit = {
import java.io.{File, PrintWriter}

val delim = ","

val out = new PrintWriter(new File(filename))

val res = benchf()

val nbench = res.length
val titles = res.map(x => x.title)

out.write(title ++ "\n")

// Save benchmarks as values in column under their respective titles
out.write(titles.mkString(delim) ++ "\n")
while (res(0).data.hasNext) {
out.write(
(for (i <- 0 until nbench) yield res(i).data.next()).mkString(delim)
)
out.write("\n")
}

out.close()
}
}
54 changes: 54 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Kludge to avoid building an empty .jar for the root project
Keys.`package` := {
(Keys.`package` in (lchannels, Compile)).value
(Keys.`package` in (examples, Compile)).value
(Keys.`package` in (benchmarks, Compile)).value
}

lazy val commonSettings = Seq(
version := "0.0.1",
scalaVersion := "2.11.8",
scalacOptions ++= Seq(
"-target:jvm-1.8", "-unchecked", "-deprecation", "-feature", "-Ywarn-unused-import"
),
// ScalaDoc setup
autoAPIMappings := true,
scalacOptions in (Compile,doc) ++= Seq(
"-no-link-warnings" // Workaround for ScalaDoc @throws links issues
)
)

lazy val lchannels = (project in file("lchannels")).
settings(commonSettings: _*).
settings(
name := "lchannels",

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-typed-experimental" % "2.4.2",
"com.typesafe.akka" %% "akka-remote" % "2.4.2"
)
)

lazy val examples = (project in file("examples")).
dependsOn(lchannels).
settings(commonSettings: _*).
settings(
name := "lchannels-examples",

libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-simple" % "1.7.16",
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0"
)
)

lazy val benchmarks = (project in file("benchmarks")).
dependsOn(lchannels).
settings(commonSettings: _*).
settings(
name := "lchannels-benchmarks"
// Depending on the benchmark size and duration, you might want
// to add the following options:
//
// fork := true, // Fork a JVM, running inside benchmarks/ dir
// javaOptions ++= Seq("-Xms1024m", "-Xmx1024m") // Enlarge heap size
)
Binary file not shown.
39 changes: 39 additions & 0 deletions examples/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
GreetingServerSys {
akka {
loglevel = WARNING
log-dead-letters-during-shutdown = off

actor {
provider = "akka.remote.RemoteActorRefProvider"
warn-about-java-serializer-usage = false
}

remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 31337
}
}
}
}

GreetingClientSys {
akka {
loglevel = WARNING
log-dead-letters-during-shutdown = off

actor {
provider = "akka.remote.RemoteActorRefProvider"
warn-about-java-serializer-usage = false
}

remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 31338
}
}
}
}
185 changes: 185 additions & 0 deletions examples/src/main/scala/lchannels/examples/Calc.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// lchannels - session programming in Scala
// Copyright (c) 2016, Alceste Scalas and Imperial College London
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
/** @author Alceste Scalas <alceste.scalas@imperial.ac.uk> */
package lchannels.examples.calc

import scala.concurrent.{ExecutionContext}
import scala.concurrent.duration.Duration
import scala.util.{Success, Failure}

import lchannels._

/////////////////////////////////////////////////////////////////////////////
// Session type:
// ?Welcome(Str) . rec X . (+) { !Negate(Int) . ?Answer(Int) . X ,
// !Add(Int) . !Add2(Int) . ?Answer(Int) . X ,
// !Quit() }
/////////////////////////////////////////////////////////////////////////////
sealed case class Welcome(message: String)(val cont: Out[Choice])

sealed abstract class Choice
case class Negate(value: Integer)(val cont: Out[Answer]) extends Choice
case class Add(value1: Integer)(val cont: In[Add2]) extends Choice
case class Quit() extends Choice

sealed case class Add2(value2: Integer)(val cont: Out[Answer])

sealed case class Answer(value: Integer)(val cont: Out[Choice])
/////////////////////////////////////////////////////////////////////////////

import lchannels.{StreamOut, StreamManager}
import java.io.{
BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
InputStream, OutputStream
}

class CalcStreamManager(in: InputStream, out: OutputStream)
(implicit ec: ExecutionContext)
extends StreamManager(in, out) {
private val inb = new BufferedReader(new InputStreamReader(in))
private val welcomeR = """WELCOME (.+)""".r
private val answerR = """ANSWER (-?\d+)""".r

override def destreamer() = inb.readLine() match {
case welcomeR(msg) => Welcome(msg)(StreamOut[Choice](this))
case answerR(n) => Answer(n.toInt)(StreamOut[Choice](this))
case unknown => {
close()
throw new java.net.ProtocolException(f"Unknown message: '${unknown}'")
}
}
private val outb = new BufferedWriter(new OutputStreamWriter(out))

override def streamer(x: scala.util.Try[Any]) = x match {
case Success(v) => v match {
case Negate(n) => outb.write(f"NEGATE ${n}\n"); outb.flush()
case Add(n) => outb.write(f"ADD ${n}\n"); outb.flush()
case Add2(n) => outb.write(f"ADD2 ${n}\n"); outb.flush()
case Quit() => outb.write("QUIT\n"); outb.flush(); close()
}
case Failure(e) => {
outb.write("ERROR\n"); outb.flush()
close()
}
}
}

object Server {
def apply(c: Out[Welcome])
(implicit timeout: Duration): Unit = {
println(f"[S] Sending welcome to ${c}...")
val c2 = c !! Welcome("Welcome to SessionCalc 0.1")_
subHandler(c2)

def subHandler(c: In[Choice]) {
println("[S] Now waiting for a choice... ")
c ? {
case Quit() => {
println(f"[S] Got Quit(), finishing")
}
case m @ Negate(value) => {
println(f"[S] Got 'Negate(${value})', answering ${-value}")
val c2 = m.cont !! Answer(-value)_
println("[S] Performing recursive call...")
subHandler(c2)
}
case m @ Add(val1) => {
println(f"[S] Got Add(${val1}), waiting for 2nd value...")
m.cont.receive match {
case m @ Add2(val2) => {
println(f"[S] Got Add2(${val2}), answering ${val1 + val2}...")
val c3 = m.cont !! Answer(val1 + val2)_
println("[S] Performing recursive call...")
subHandler(c3)
}
}
}
}
}
}
}

object Client {
def apply(c: In[Welcome])
(implicit timeout: Duration): Unit = {
val welcome = c.receive
println(f"[C] Got '${welcome.message}'")

println(f"[C] Sending Negate(42)...")
val ans = welcome.cont !! Negate(42)_
println("[C] ...done. Now waiting for answer...")
val neg = ans.receive
println(f"[C] ...done: got ${neg.value}")

println("[C] Now trying to add 7 and 5...")

val ans2 = neg.cont !! Add(7)_ !! Add2(5)_
println("[C] ...done. Now waiting for answer...")
val sum = ans2.receive
println(f"[C] ...done: got ${sum.value}")

println("[C] Now quitting")
sum.cont ! Quit()
}
}

object Local extends App {
// Helper method to ease external invocation
def run() = main(Array())

import lchannels.LocalChannel.parallel
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

implicit val timeout = 5.seconds

println("[*] Spawning local server and client...")
val (s, c) = parallel[Welcome, Unit, Unit](
Client(_), Server(_)
)

Await.result(s, 10.seconds) // Wait for server termination
}

object Queue extends App {
// Helper method to ease external invocation
def run() = main(Array())

import lchannels.QueueChannel.parallel
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

implicit val timeout = 5.seconds

println("[*] Spawning local server and client (using queue-based channels)...")
val (s, c) = parallel[Welcome, Unit, Unit](
Client(_), Server(_)
)

Await.result(s, 10.seconds) // Wait for server termination
}
Loading

1 comment on commit cf220eb

@michalrus
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon me, but—judging just by the diff stats—are you by any chance academia-based?

Initial checkin
Showing 32 changed files with 5,341 additions and 0 deletions.

😥

Please sign in to comment.