### Topics

In [2]:
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util._

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util._


#### Running Tasks in the Future

In [29]:
import java.time._

Future {
  Thread.sleep(10000)
  println(s"this is the future at ${LocalTime.now}")  // will be printed in the server's log
}

println(s"this is the present at ${LocalTime.now}")

this is the present at 07:57:33.945


scala.concurrent.ExecutionContext$Implicits$@6a4d0032

> Scala uses `ExecutionContext` instead of Java's `Executor` 

In [22]:
Future { for (i <- 1 to 10) { print("a"); Thread.sleep(20) } }
Future { for (i <- 1 to 10) { print("b"); Thread.sleep(20) } } // ?

b

Future(<not completed>)

bbbbbbbbb

In [29]:
val f = Future {
  Thread.sleep(10000)
  42
}

Future(<not completed>)

In [31]:
f //different result 

Future(Success(42))

In [34]:
val f2 = Future {
  if (LocalTime.now.getHour != 0)
    throw new Exception("too late")
  42
}

Future(Failure(java.lang.Exception: too late))

In [35]:
f2 // evaluate

Future(Failure(java.lang.Exception: too late))

#### Waiting for Results

In [11]:
import scala.concurrent.duration._

val f = Future { Thread.sleep(10000); 42}
val result = Await.result(f, 10.seconds) // blocking

42

In [12]:
10.seconds // duration conversion method

10 seconds

In [13]:
val f = Future { Thread.sleep(10000); 42}
val result = Await.result(f, 1.seconds) // task not ready, exception thrown

java.util.concurrent.TimeoutException:  Futures timed out after [1 second]

In [16]:
val f = Future { Thread.sleep(10000); 42}
Await.ready(f, 10.seconds)  // wait after ready, get value 
val Some(t) = f.value // value return Option[Try[T]], t is an object of Try (holds the result or exception)

Success(42)

> won't use `Await`(means blocking) much, use callback

#### Try

Try[T]
- `Success(v)` v: T
- `Failure(ex)` ex: Throwable

In [17]:
import scala.util._
Try("123".toInt) match {
  case Success(v) => println(s"answer is $v")
  case Failure(ex) => println(ex.getMessage)
}

answer is 123


null

#### Callbacks

> the future should report its result to a callback function

In [12]:
import scala.math._

val f = Future {
    Thread.sleep(10000)
    if (random < 0.5) throw new Exception
    42
}
f.onComplete {
    case Success(v) => println(s"answer is $v")
    case Failure(ex) => println(ex.getMessage)
}

null

answer is 42


> avoid blocking, but nested callbacks produce **callback hell**

In [13]:
val f1 = Future { Thread.sleep(1000) ; 1}
val f2 = Future { Thread.sleep(2000) ; 2}
f1 onComplete {
    case Success(v1) => {
        f2 onComplete {
            case Success(v2) => {
                val v3 = v1 + v2;
                println(s"result: $v3")
            }
            case Failure(ex) => println(ex.getMessage)
        }
    }
    case Failure(ex) => println(ex.getMessage)
}

null

result: 3


#### Composing Future Tasks

> Think of a `Future` as a collection with one element.

In [24]:
val f1 = Future { Thread.sleep(1000) ; 1}
val f2 = Future { Thread.sleep(2000) ; 2}
// val combined = f1.map(v1 => f2.map(v2 => v1 + v2)) // Future[Future[Int]]
val combined = f1.flatMap(v1 => f2.map(v2 => v1 + v2))
// use for expression
val combined2 = for (v1 <- f1; v2 <- f2) yield v1 + v2
// for expression with guards
val combined3 = for (v1 <- f1; v2 <- f2 if v1 > 0) yield v1 + v2

Future(<not completed>)

> `map`/`flatMap` implementions will take care of errors

Ordered execution (delay the creation, use functions)

In [44]:
val f1 = Future { println("invoked f1"); Thread.sleep(1000); 1}
def f2 = Future { println("invoked f2"); Thread.sleep(2000); 2} // def will be evaluated in for expression
val combined = for (v1 <- f1; v2 <- f2) yield v1 + v2

Future(<not completed>)

> [An asynchronous programming facility for Scala](https://github.com/scala/scala-async)

```scala
val combined = async { await(future1) + await(future2) }
```

#### Promises

- `Future` object is read-only, value is set implicitly
- `Promise` value can be set explicitly

In [31]:

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util._

def compute(arg: String) = Future {
    val n = arg.toInt
//     Thread.sleep(1000)
    n
}

// more flexiable producer (multi promise / multi tasks...)
def compute2(arg: String) = {
    val p = Promise[Int]()
    Future {
        val n = arg.toInt
        p.success(n)  // set result , the associated Future is completed
//         p.failure(ex) // set exception
        Thread.sleep(1000)
    }
    p.future // yields the associated Future object
}

println(compute("123"))
println(compute2("123"))

Future(<not completed>)
Future(<not completed>)


null

---

### Exercises

In [57]:
/**
 Write a function doInOrder that, given two functions f: T => Future[U] and g: U
=> Future[V], produces a function T => Future[U] that, for a given t, eventually
yields g(f(t))
*/

def doInOrder[T, U, V](f: T => Future[U], g: U => Future[V]): T => Future[V] = {
    t => f(t).flatMap(u => g(u))
}

def f = (t: String) => Future {t.toInt}
def g = (u: Int) => Future {u.toDouble}

doInOrder(f,g)("123")

Future(Success(123.0))

In [129]:
/**
Repeat the preceding exercise for any sequence of functions of type T => Future[T]
*/

def doInOrder[T](fs: Seq[T => Future[T]]): T => Future[T] = {
    t => fs match {
        case head :: Nil => head(t)
        case head :: tail => head(t).flatMap(doInOrder(tail)(_))
    }
}

def a = (t: String) => Future {t + "1"}
def b = (t: String) => Future {t + "2"}
def c = (t: String) => Future {t + "3"}
doInOrder(Seq(a,b,c))("hello")

Future(Success(hello123))

In [26]:
/**
Write a function doTogether that, given two functions f: T => Future[U] and g: U
=> Future[V], produces a function T => Future[(U, V)], running the two
computations in parallel and, for a given t, eventually yielding (f(t), g(t))
*/
def doTogether[T,U,V](f: T => Future[U], g: T => Future[V]): T => Future[(U, V)] = {
    t => for (u <- f(t); v <- g(t)) yield (u,v)
}

def a = (t: String) => Future {t.toInt}
def b = (t: String) => Future {t.toLong}
doTogether(a, b)("10")

Future(Success((10,10)))

In [128]:
/**
Write a function that receives a sequence of futures and returns a future that eventually yields a
sequence of all results.
*/

import scala.util._
def doSeq[T, U](fs: Seq[T => Future[U]]): T => Future[Seq[U]] = {
    // Future.sequence converts Seq[Future[T]] to Future[Seq[T]]
    t => Future.sequence(for(f <- fs) yield f(t))
}

def a = (t: String) => Future {t + "1"}
def b = (t: String) => Future {t + "2"}
def c = (t: String) => Future {t + "3"}
doSeq(Seq(a,b,c))("hello")

Future(Success(List(hello1, hello2, hello3)))

In [4]:
/**
Write a method
Future[T] repeat(action: => T, until: T => Boolean)
that asynchronously repeats the action until it produces a value that is accepted by the until
predicate, which should also run asynchronously. Test with a function that reads a password
from the console, and a function that simulates a validity check by sleeping for a second and
then checking that the password is "secret". Hint: Use recursion.
*/
def repeat[T](action: => T, util: T => Boolean): Future[T] = {
    import scala.annotation.tailrec
    @tailrec def calc: T = {
        val result = action
        if (util(result)) {
            result
        } else {
            calc
        }
    }
    
    Future[T] {
        calc
    }
}

def action = {
    Seq("1", "2", "secret")(2)
}

def util = (f: String) => f == "secret"

repeat(action,util) onComplete {
    case Success(v) => println(s"result:$v")
    case Failure(ex) => println(s"ex:${ex.getMessage}")
}

null

result:secret


In [26]:
/**
Write a program that counts the prime numbers between 1 and n, as reported by
BigInt.isProbablePrime. Divide the interval into p parts, where p is the number of
available processors. Count the primes in each part in concurrent futures and combine the
results
*/

def countPrime(n: Int): Future[Int] = {
    Future {
        val processors = Runtime.getRuntime.availableProcessors()
        println(processors)
        (for (i <- 1 to n  if (math.BigInt(i).isProbablePrime(10)) ) yield 1).sum
    }
}

countPrime(100) onComplete {
    case Success(v) => println(s"result:$v")
    case Failure(ex) => println(s"ex:${ex.getMessage}")
}

null