## Tools And Libs

By: Alex Comerford (alexanderjcomerford@gmail.com)

In this notebook we will be going over a bunch of tools and libraries commonly used in scala to do actual useful work for common software tasks.

## Multi line string tokenization

A common problem in NLP is tokenization of text. In this next cell we will implement a multiline word tokenizer extractor to parse multi line strings into a list of words, then count those words in a table.

In [None]:
object MultiLineTokenize {
    def apply(s: String): Any = 
        s.split("\n")
         .toList
         .map(_.trim)
         .filter(_ != "")
         .map(_.split(" ").toList)
}

def flatten(e: Any): List[Any] = {
    e match {
        case Nil => Nil
        case head :: rest => flatten(head) ++ flatten(rest)
        case v => List(v)
    }
}

val input = """
This is a thing
sentences are yay
I like potatoes
They are much yum
"""
flatten(MultiLineTokenize(input)).groupBy(identity).mapValues(_.size)

## Writing and reading to a file

In this next cell we will go over a fundamental programming exercise, reading and writing to a file.

In [None]:
import java.io._
import scala.io.Source

val filename = "file.txt"
val file = new File(filename)

// Use java bufferwriter to write text to file
val bw = new BufferedWriter(new FileWriter(file))
bw.write(
"""
potato
tomato
apple
""")
bw.close()

// print contents of file
for (line <- Source.fromFile(filename).getLines) {
    println(line)
}

// create file monad 
implicit class FileMonads(f: File) {
  def check = if (f.exists) Some(f) else None
  def remove = if (f.delete()) Some(f) else None
}

// use match case to delete file
file check match {
    case Some(a) => {
        file remove;
        println(s"File $a deleted!")
    }
    case _ => {
        println("File Not found")
    }
}

## Executing system commands

In [None]:
import sys.process._

println("ls -al".!!)

// Multiple commands piping
val procs = ("ps a" #| "wc -l").!!.trim
println(s"procs = $procs")

// multiple command piping in shell
val files = Seq("/bin/sh", "-c", "ls | grep .ipynb").!!
println(files)

## Concurrency!

This is the section I am most excited about. Scala is very well known for its concurrency model. Scala uses the actor model unlike golang CSP concurrency or others

We will be using the `akka` framework for these next examples.

We will install all of these dependencies using the `%%classpath` magic

In [2]:
%%classpath add mvn
com.typesafe.akka akka-actor_2.11 2.5.17
com.typesafe.akka akka-stream_2.11 2.5.17

### Simple actor

In this next cell we will create a simple akka actor that prints to the console when recieving a message

In [3]:
import akka.actor._

class HelloActor(myName: String) extends Actor {
    def receive = {
        case "hello" => println(s"hello from $myName")
        case _       => println(s"'huh?', said $myName")
    }
}

// creat system and actor
val system = ActorSystem("HelloSystem")
val helloActor = system.actorOf(Props(new HelloActor("Fred")), name = "helloactor")

// say hello then hello
helloActor ! "hello"
helloActor ! "buenos dias"

hello from Fred
'huh?', said Fred


null

### Telling and Asking

One important concept that is describe in the [docs](https://doc.akka.io/docs/akka/2.4.1/scala/actors.html#Send%20messages) is the idea of how to send messages. Below is an example of sending messages via "fire and forget" or "telling" (similar to like UDP) vs asynchronous messages and returning a future.

In [12]:
import akka.actor.{Props, Actor}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{ Future, Promise, Await }
import akka.pattern.ask
import akka.util.Timeout

/*
 * TellActor uses the ! operator to tell message 
 * the ReceiveActor. TellActor also can recieve
 * messages and pring to console
 */
class TellActor extends Actor {
  val recipient = context.actorOf(Props(new ReceiveActor()))
  def receive = {
    case "Start" =>
      println(s"Telling 'Hello' to $recipient")
      recipient ! "Hello" // equivalent to recipient.tell("hello", self)
    case reply => println(s"From $this: $reply")
  }
} 

/*
 * AskActor uses the ? operator to ask message 
 * the ReceiveActor and wait for response. 
 * AskActor also can recieve
 * messages and pring to console
 */
class AskActor extends Actor {
  val recipient = context.actorOf(Props(new ReceiveActor()))
  def receive = {
    case "Start" =>
      println(s"Asking 'Hello' to $recipient")
      implicit val timeout = Timeout(10 seconds)
      val replyF = recipient ? "Hello" // equivalent to recipient.ask("Hello")
      replyF.onSuccess{
        case reply => println(s"From $this: $reply")
      }
  }
}

/*
 * ReceiveActor recieves messages and relays to sender 
 */
class ReceiveActor extends Actor {
  def receive = {
    case "Hello" => sender ! "And Hello to you!"
  }
}

val system = ActorSystem("AskTellMessageSystem")
val tellActor = system.actorOf(Props(new TellActor()), name = "Tell")
val askActor = system.actorOf(Props(new AskActor()), name = "Ask")
tellActor ! "Start"
askActor ! "Start"
Await.ready(system.whenTerminated, 2 seconds)

Telling 'Hello' to Actor[akka://AskTellMessageSystem/user/Tell/$a#1548639949]
Asking 'Hello' to Actor[akka://AskTellMessageSystem/user/Ask/$a#-1593294270]
From $line39.$read$$iw$$iw$$iw$$iw$AskActor@5e4bffb7: And Hello to you!
From $line39.$read$$iw$$iw$$iw$$iw$TellActor@663f50ed: And Hello to you!


warning:  there were two feature warnings; re-run with -feature for details

### Back and forth

An important feature of the actor model for concurrent and distributed systems is message passing between actors. In this next example similar to the famous Java Ping Pong Threading example we will redefine in scala using akka actors

In [5]:
import akka.actor._

// Define message case objects
case object PingMessage
case object PongMessage
case object StartMessage
case object StopMessage

/*
 * Ping Actor class on recieving will  
 *
 */
class Ping(pong: ActorRef, maxPong: Int = 10) extends Actor {
  var count = 0
  def incrementAndPrint { println(s"ping $count"); count += 1 }
  def receive = {
    case StartMessage =>
        incrementAndPrint
        pong ! PingMessage
    case PongMessage => 
        incrementAndPrint
        if (count > maxPong) {
          sender ! StopMessage
          println("ping stopped")
          context.stop(self)
        } else {
          sender ! PingMessage
        }
  }
}

class Pong extends Actor {
  var count = 0
  def receive = {
    case PingMessage =>
        println(s"  pong $count")
        count+=1
        sender ! PongMessage
    case StopMessage =>
        println("pong stopped")
        context.stop(self)
  }
}

val system = ActorSystem("PingPongSystem")
val pong = system.actorOf(Props(new Pong()), name = "pong")
val ping = system.actorOf(Props(new Ping(pong)), name = "ping")
ping ! StartMessage

ping 0
  pong 0
ping 1
  pong 1
ping 2
  pong 2
ping 3
  pong 3
ping 4
  pong 4
ping 5
  pong 5
ping 6
  pong 6
ping 7
  pong 7
ping 8
  pong 8
ping 9
  pong 9
ping 10
ping stopped
pong stopped


null

## Lifecycle actors

There are several lifecycle methods associated with an actor in akka.
* preStart
* postStop
* preRestart
* postRestart
* recieve

In [20]:
import akka.actor._
import akka.actor.{Props, Actor}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{ Future, Promise, Await }
import akka.pattern.ask
import akka.util.Timeout

class Kenny extends Actor {
    println("Entering Kenny constructor")
    override def preStart { println("kenny: preStart") }
    override def postStop { println("kenny: postStop") }
    override def preRestart(reason: Throwable, message: Option[Any]) {
        println("kenny: preRestart")
        println(s" MESSAGE: ${message.getOrElse("")}")
        println(s" REASON: ${reason.getMessage}")
        super.preRestart(reason, message)
    }
    override def postRestart(reason: Throwable) {
        println("kenny: postRestart")
        println(s" REASON: ${reason.getMessage}")
        super.postRestart(reason)
    }
    def receive = {
        case ForceRestart => throw new Exception("Boom!")
        case _ => println("Kenny received a message")
    }
}

case object ForceRestart

val system = ActorSystem("LifecycleDemo")
val kenny = system.actorOf(Props(new Kenny()), name = "Kenny")

println("sending kenny a simple String message")
kenny ! "hello"
Thread.sleep(1000)

println("make kenny restart")
kenny ! ForceRestart
Thread.sleep(1000)

println("stopping kenny")
system.stop(kenny)

println("shutting down system")

system.terminate()

sending kenny a simple String message
Entering Kenny constructor
kenny: preStart
Kenny received a message
make kenny restart
kenny: preRestart
[ERROR] [10/11/2018 18:42:11.514] [LifecycleDemo-akka.actor.default-dispatcher-8] [akka://LifecycleDemo/user/Kenny] Boom!
java.lang.Exception: Boom!
	at $line54.$read$$iw$$iw$$iw$$iw$Kenny$$anonfun$receive$1.applyOrElse(<console>:44)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at $line54.$read$$iw$$iw$$iw$$iw$Kenny.aroundReceive(<console>:28)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
	at akka.actor.ActorCell.invoke(ActorCell.scala:557)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979

Future(Success(Terminated(Actor[akka://LifecycleDemo/])))

## Concurrent sleeping

In this next example we will create an example future. Futures are really easy to construct, all we need to do is use the future trait with a code block which will execute when it wants. In this case we will sleep for 500 milliseconds asynchronously and return the 42 value which should be printed in the middle of all the other print statements

In [29]:
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random

def sleep(time: Long) { Thread.sleep(time) }

println("starting calculation ...")
val f = Future {
    sleep(500)
    42
}
println("before onComplete")
f.onComplete {
    case Success(value) => println(s"Got the callback, meaning = $value")
    case Failure(e) => e.printStackTrace
}
// do the rest of your work
println("1 ..."); sleep(100)
println("2 ..."); sleep(100)
println("3 ..."); sleep(100)
println("4 ..."); sleep(100)
println("5 ..."); sleep(100)
println("6 ..."); sleep(100)
sleep(2000)

starting calculation ...
before onComplete
1 ...
2 ...
3 ...
4 ...
5 ...
Got the callback, meaning = 42
6 ...


null

## Parallel Collections

Low level parallelization can be extremely messy and complex. In the scala standard library parallel collections have been designed to be easy and high level! 

From https://docs.scala-lang.org/overviews/parallel-collections/overview.html

```
"The hope was, and still is, that implicit parallelism behind a collections abstraction will bring reliable parallel execution one step closer to the workflow of mainstream developers"
```

Below I will show a few examples of parallel and non parallel operations and show their performance, fall backs, and benefits!

### Simple performance

Below we use jupyters `%time` magic to compare parallelization between map operations on lists

In [169]:
val list = (1 to 80000).toList;

println("Non Parallel")
%time list.map(_ + 42)

println("Parallel")
%time list.par.map(_ + 42)

println("fin")

Non Parallel
CPU times: user 0 ns, sys: 254 µs, total: 254 µs 
Wall Time: 1 s

Parallel
CPU times: user 0 ns, sys: 227 µs, total: 227 µs 
Wall Time: 1 s

fin


null

## Side-Effecting Operations

An interesting concequence of parallelized collections in scala are race conditions. This means that the threads spawned by scala will be accessing (read/write) the same data potentially overwriting / losing a previous result.

In the next cell we will demonstate this by first synchronously summing a list of numbers, then parallelizing that same summing operation which results in several operations overwriting eachother accessing older sum values resulting in our final value to be less than the true value 

In [175]:
var sum = 0
val list = (1 to 1000).toList
list.foreach(sum += _); println(sum)
sum = 0
list.par.foreach(sum += _); println(sum)
sum = 0
list.par.foreach(sum += _); println(sum)
sum = 0
list.par.foreach(sum += _); println(sum)
println("fin")

500500
497189
484631
472256
fin


null

### Non-Associative Operations

This "out of order" semantics also applies to non associative operations. The programmer must be sure to design their parallel execution such that to avoid non deterministic outputs.

Below is that example of a non deterministic non associate operation that should not be used in certain capacities is subtraction

In [183]:
val list = (1 to 1000).toList

println(list.reduce(_-_))
println(list.par.reduce(_-_))
println(list.par.reduce(_-_))
println(list.par.reduce(_-_))

-500498
-301748
-46000
0


null