Skip to content

Commit

Permalink
Small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Lacko committed Aug 18, 2015
1 parent bdd78ab commit fa72bef
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 3 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Expand Up @@ -15,4 +15,7 @@ project/plugins/project/
# Scala-IDE specific
.scala_dependencies
.worksheet
.idea
.idea

/data
/journal
Expand Up @@ -16,6 +16,18 @@ class LargestDelayActorComplete extends PersistentActor with ActorSubscriber wit
override def inFlightInternally = inFlight
}

override def receive = {
case OnNext(data: FlightData) =>
FlightWithDelayPerMile(data).foreach { d =>
inFlight += 1
persistAsync(d) { _ =>
processDelayData(d)
inFlight -= 1
}
}
case LogLargestDelay => logger.info("Largest delay so far: " + largestDelay)
}

def receiveCommand = {
case OnNext(data: FlightData) =>
FlightWithDelayPerMile(data).foreach { d =>
Expand Down
Expand Up @@ -66,6 +66,7 @@ object ReceiverComplete {
}
}

//start with this one since sender port is 9171+10
object ClusteredReceiver1 extends App {
new ReceiverClusterNode(9171).run()
}
Expand Down
Expand Up @@ -10,9 +10,10 @@ import scala.concurrent.duration._

object SenderComplete extends App with Logging {
implicit val system = ActorSystem()
val serverConnection = Tcp().outgoingConnection("localhost", 9182)
val serverConnection = Tcp().outgoingConnection("localhost", 9181)

val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines()
val projectDir = System.getProperty("user.dir")
val getLines = () => scala.io.Source.fromFile(s"$projectDir/data/2008.csv").getLines()

val linesSource = Source(getLines).map { line => ByteString(line + "\n") }
val logCompleteSink = Sink.onComplete(r => logger.info("Completed with: " + r))
Expand Down

0 comments on commit fa72bef

Please sign in to comment.