Skip to content

Commit

Permalink
Updating to RC2
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed May 5, 2015
1 parent ac14e4c commit 9f44722
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 54 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Expand Up @@ -6,15 +6,15 @@ version := "0.1-SNAPSHOT"

scalaVersion := "2.11.6"

val akkaVersion = "2.3.9"
val akkaVersion = "2.3.10"

libraryDependencies ++= Seq(
// akka
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-experimental" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-contrib" % akkaVersion,
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-M5",
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-RC2",
// util
"com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2",
"ch.qos.logback" % "logback-classic" % "1.1.3",
Expand Down
11 changes: 5 additions & 6 deletions src/main/resources/slides/slide1.txt
Expand Up @@ -16,9 +16,8 @@
| | . | .'| | | | | | .'| _|_ -| '_| |
|__|__|___|__,|_|_|_| |_____|__,|_| |___|_,_|_|

_____ _ _ _ _
|_ _|__ ___| |__ _ __ ___ | | ___ __ _ _ _| | | | ___ _ _ _ __
| |/ _ \/ __| '_ \| '_ \ / _ \| |/ _ \ / _` | | | | |_| |/ _ \| | | | '__|
| | __/ (__| | | | | | | (_) | | (_) | (_| | |_| | _ | (_) | |_| | |
|_|\___|\___|_| |_|_| |_|\___/|_|\___/ \__, |\__, |_| |_|\___/ \__,_|_|
|___/ |___/
_ _ ____
| | | |/ ___|
| | _ | | |
| |__| |_| | |___
|_____\___/ \____|
Expand Up @@ -15,7 +15,7 @@ class ParseLinesStage(separator: String, maximumLineBytes: Int) extends Stateful
private var nextPossibleMatch = 0

def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
override def onPush(chunk: ByteString, ctx: Context[String]) = {
buffer ++= chunk
if (buffer.size > maximumLineBytes) {
println("XXX FAIL")
Expand Down
@@ -1,13 +1,11 @@
package com.softwaremill.reactive.complete

import java.net.InetSocketAddress

import akka.actor.Actor.emptyBehavior
import akka.actor.{Actor, ActorSystem, PoisonPill, Props}
import akka.contrib.pattern.ClusterSingletonManager
import akka.stream.ActorFlowMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Source, Sink, StreamTcp}
import akka.stream.scaladsl.{Tcp, Source, Sink}
import com.softwaremill.reactive._
import com.softwaremill.reactive.complete.ReceiverComplete.ReceiverClusterNode
import com.typesafe.config.ConfigFactory
Expand All @@ -16,15 +14,15 @@ import scala.concurrent.duration._

object ReceiverComplete {

class Receiver(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging {
class Receiver(host: String, port: Int)(implicit val system: ActorSystem) extends Logging {

def run(): Unit = {
implicit val mat = ActorFlowMaterializer()

val largestDelayActor = system.actorOf(Props[LargestDelayActorComplete])

logger.info("Receiver: binding to " + receiverAddress)
StreamTcp().bind(receiverAddress).runForeach { conn =>
logger.info(s"Receiver: binding to $host:$port")
Tcp().bind(host, port).runForeach { conn =>
logger.info(s"Receiver: sender connected (${conn.remoteAddress})")

val receiveSink = conn.flow
Expand Down Expand Up @@ -59,11 +57,9 @@ object ReceiverComplete {
}

class ReceiverNodeActor(clusterPort: Int) extends Actor {
val receiverAddress = new InetSocketAddress("localhost", clusterPort + 10)

override def preStart() = {
super.preStart()
new Receiver(receiverAddress)(context.system).run()
new Receiver("localhost", clusterPort + 10)(context.system).run()
}

override def receive = emptyBehavior
Expand All @@ -84,5 +80,5 @@ object ClusteredReceiver3 extends App {

object SimpleReceiver extends App {
implicit val system = ActorSystem()
new ReceiverComplete.Receiver(new InetSocketAddress("localhost", 9182)).run()
new ReceiverComplete.Receiver("localhost", 9182).run()
}
@@ -1,7 +1,5 @@
package com.softwaremill.reactive.complete

import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
Expand All @@ -12,7 +10,7 @@ import scala.concurrent.duration._

object SenderComplete extends App with Logging {
implicit val system = ActorSystem()
val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9181))
val serverConnection = Tcp().outgoingConnection("localhost", 9182)

val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines()

Expand Down
@@ -1,7 +1,5 @@
package com.softwaremill.reactive.step1

import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
Expand All @@ -11,13 +9,13 @@ import com.softwaremill.reactive._
* - flow from the client, transforming, no response
* - *elastic*: delay to see the backpressure
*/
class ReceiverStep1(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging {
class ReceiverStep1(host: String, port: Int)(implicit val system: ActorSystem) extends Logging {

def run(): Unit = {
implicit val mat = ActorFlowMaterializer()

logger.info("Receiver: binding to " + receiverAddress)
StreamTcp().bind(receiverAddress).runForeach { conn =>
logger.info(s"Receiver: binding to $host:$port")
Tcp().bind(host, port).runForeach { conn =>
logger.info(s"Receiver: sender connected (${conn.remoteAddress})")

val receiveSink = conn.flow
Expand All @@ -37,5 +35,5 @@ class ReceiverStep1(receiverAddress: InetSocketAddress)(implicit val system: Act

object ReceiverStep1 extends App {
implicit val system = ActorSystem()
new ReceiverStep1(new InetSocketAddress("localhost", 9182)).run()
new ReceiverStep1("localhost", 9182).run()
}
@@ -1,7 +1,5 @@
package com.softwaremill.reactive.step1

import java.net.InetSocketAddress

import akka.actor.{Props, ActorSystem}
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
Expand All @@ -17,7 +15,7 @@ import com.softwaremill.reactive._
object SenderStep1 extends App with Logging {
implicit val system = ActorSystem()
val bytesPerSecondActor = system.actorOf(Props[BytesPerSecondActor])
val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9182))
val serverConnection = Tcp().outgoingConnection("localhost", 9182)

val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines()

Expand Down
@@ -1,11 +1,9 @@
package com.softwaremill.reactive.step2

import java.net.InetSocketAddress

import akka.actor.{ActorSystem, Props}
import akka.stream.ActorFlowMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Source, Sink, StreamTcp}
import akka.stream.scaladsl.{Tcp, Source, Sink}
import com.softwaremill.reactive._
import com.softwaremill.reactive.complete.LargestDelayActorComplete

Expand All @@ -15,15 +13,15 @@ import scala.concurrent.duration._
* - sending data to an actor, which processes it further
* - the actor must be reactive
*/
class ReceiverStep2(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging {
class ReceiverStep2(host: String, port: Int)(implicit val system: ActorSystem) extends Logging {

def run(): Unit = {
implicit val mat = ActorFlowMaterializer()

val largestDelayActor = system.actorOf(Props[LargestDelayActorComplete])

logger.info("Receiver: binding to " + receiverAddress)
StreamTcp().bind(receiverAddress).runForeach { conn =>
logger.info(s"Receiver: binding to $host:$port")
Tcp().bind(host, port).runForeach { conn =>
logger.info(s"Receiver: sender connected (${conn.remoteAddress})")

val receiveSink = conn.flow
Expand All @@ -43,5 +41,5 @@ class ReceiverStep2(receiverAddress: InetSocketAddress)(implicit val system: Act

object ReceiverStep2 extends App {
implicit val system = ActorSystem()
new ReceiverStep2(new InetSocketAddress("localhost", 9182)).run()
new ReceiverStep2("localhost", 9182).run()
}
@@ -1,7 +1,5 @@
package com.softwaremill.reactive.step2

import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
Expand All @@ -15,7 +13,7 @@ import scala.concurrent.duration._
*/
object SenderStep2 extends App with Logging {
implicit val system = ActorSystem()
val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9182))
val serverConnection = Tcp().outgoingConnection("localhost", 9182)

val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines()

Expand Down
@@ -1,13 +1,11 @@
package com.softwaremill.reactive.step3

import java.net.InetSocketAddress

import akka.actor.Actor.emptyBehavior
import akka.actor.{Actor, ActorSystem, PoisonPill, Props}
import akka.contrib.pattern.ClusterSingletonManager
import akka.stream.ActorFlowMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Source, Sink, StreamTcp}
import akka.stream.scaladsl.{Tcp, Source, Sink}
import com.softwaremill.reactive._
import com.typesafe.config.ConfigFactory

Expand All @@ -16,15 +14,15 @@ import scala.concurrent.duration._
/**
* - no changes
*/
class ReceiverStep3(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging {
class ReceiverStep3(host: String, port: Int)(implicit val system: ActorSystem) extends Logging {

def run(): Unit = {
implicit val mat = ActorFlowMaterializer()

val largestDelayActor = system.actorOf(Props[LargestDelayActorStep3])

logger.info("Receiver: binding to " + receiverAddress)
StreamTcp().bind(receiverAddress).runForeach { conn =>
logger.info(s"Receiver: binding to $host:$port")
Tcp().bind(host, port).runForeach { conn =>
logger.info(s"Receiver: sender connected (${conn.remoteAddress})")

val receiveSink = conn.flow
Expand All @@ -44,7 +42,7 @@ class ReceiverStep3(receiverAddress: InetSocketAddress)(implicit val system: Act

object ReceiverStep3 extends App {
implicit val system = ActorSystem()
new ReceiverStep3(new InetSocketAddress("localhost", 9182)).run()
new ReceiverStep3("localhost", 9182).run()
}

/**
Expand All @@ -71,11 +69,9 @@ class ReceiverClusterNodeStep3(clusterPort: Int) {
}

class ReceiverNodeActorStep3(clusterPort: Int) extends Actor {
val receiverAddress = new InetSocketAddress("localhost", clusterPort + 10)

override def preStart() = {
super.preStart()
new ReceiverStep3(receiverAddress)(context.system).run()
new ReceiverStep3("localhost", clusterPort + 10)(context.system).run()
}

override def receive = emptyBehavior
Expand Down
@@ -1,7 +1,5 @@
package com.softwaremill.reactive.step3

import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
Expand All @@ -15,7 +13,7 @@ import scala.concurrent.duration._
*/
object SenderStep3 extends App with Logging {
implicit val system = ActorSystem()
val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9181))
val serverConnection = Tcp().outgoingConnection("localhost", 9181)

val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines()

Expand Down

0 comments on commit 9f44722

Please sign in to comment.