# Futures

In [1]:
import scala.concurrent.Future
// makes the Future use a global thread pool
import scala.concurrent.ExecutionContext.Implicits.global

[32mimport [39m[36mscala.concurrent.Future
// makes the Future use a global thread pool
[39m
[32mimport [39m[36mscala.concurrent.ExecutionContext.Implicits.global[39m

* Future code runs on a different thread.
* Scala uses `ExecutionContext` trait to provide thread pool.
* Future expects an execution context to be provided.
* Multiple futures can execute concurrently

In [2]:
// concurrent future execution
// Notebooks could have problem in displaying the future results
def getFuture(range: Seq[Int]): Future[Int] = 
    Future { 
        range.foldLeft(0)((s ,i) =>{
            Thread.sleep(1000)
            println(i)
            s + i
        })
    }


val f1 = getFuture((1 to 10))
val f2 = getFuture((100 to 110))
println(f1, f2)

(Future(<not completed>),Future(<not completed>))


In [3]:
f1.foreach(println)
f2.foreach(println)

* Future returns a `Try` which could either be a `Success` of some
value or `Failure` containing some exception.

* Avoid updating shared objects from futures. Always compute
the results concurrently and merge the results of all the 
futures.

## Waiting for the results

In [4]:
import scala.concurrent.Await
import scala.concurrent.duration._

var sumFuture = getFuture((1 to 10))
// seconds method on the Int comes from Implicits
// present in scala.concurrent.duration._

// This await blocks for N seconds. If the future is not
// completed within specified duration, TimeoutException is raised
var sum = Await.result(sumFuture, 15.seconds)
println(sum)

100
1
101
2
1
102
3
2
103
4
3
104
5
4
105
6
5
106
7
6
107
8
7
108
9
8
109
10
55
9
110
1155
10
55


In [5]:
try {
   sumFuture = getFuture((1 to 10))
   sum = Await.result(sumFuture, 5.seconds)
} catch {
    case ex: scala.concurrent.TimeoutException => {
    println(ex.getMessage())
    }
}

1
2
3
4
Future timed out after [5 seconds]
5


In [6]:
import scala.util.Try
// Await.ready catches the timeout exception
// unlike result which throws to the caller
sumFuture = getFuture((1 to 10))
Await.ready(sumFuture, 5.seconds)

// value is Option[Try[T]]
println(sumFuture.value == None)

6
1
7
2
8
3
9
4
10


: 

 * Its recommended to use callbacks compared to Await which blocks the
 main thread.

* Try can either be `Success(value)` or `Failure(exception)`
* We can use pattern matching to extract values
* Try also has `.isSuccess`, `.isFailure` methods.
* Try to Option using `toOption` - Succes in to Some and Failure to None.

In [7]:
// we can pass a block to try
import scala.util._

val randGen = new Random()
val result = Try {
    println("This statement is inside a try block")
    val n = randGen.nextInt(10)
    if (n > 5)
        throw new Exception("Some exception")
    else
        n
}

// pattern matching with extractors to get the result
result match {
    case Success(value) => println(value)
    case Failure(ex) => println(ex.getMessage())
}

This statement is inside a try block
1


[32mimport [39m[36mscala.util._

[39m
[36mrandGen[39m: [32mRandom[39m = scala.util.Random@7098f6f0
[36mresult[39m: [32mTry[39m[[32mInt[39m] = [33mSuccess[39m([32m1[39m)

## Callbacks

* `Future.onComplete(Try)` - callback method.
* Callbacks could lead to callback hell.

## Composing Future tasks

In [None]:
// When operating with multiple futures need to the result
// to be combined or one depends on the output of the other
// we can use methods like map, flatMap to compose those
// futures.
val sum1 = getFuture((1 to 10))
val sum2 = getFuture((11 to 20))

// treat future like a collection with one value.
val result = sum1.flatMap(r1 => sum2.map(r2 => r1 + r2))
result.foreach(println)

* Futures contains other useful methods like `filter`, `foreach`, 
`recover`, `fallbackTo`, `zip`, `failed`, `zipWith`

## Methods in Future companion object

* `Future.sequence(futures_collection)` - Returns a collection of
results of each of the future. If any one future, fails, then the 
resulting future also fails.

* `Future.traverse` combines mapping of subcollection to futures
and collecting the results of all those futures(sequence method)
* `reduceLeft` and `foldLeft` are also available

* `Future.successful(r)`, `Future.failed(r)`, `Future.unit`,
`Future.never`, `Future.fromTry(t)`

In [9]:
val ranges = List[Seq[Int]](
    (1 to 10),
    (11 to 20),
    (21 to 30)
)

val futures: List[Future[Int]] = ranges.map(r => Future {
    r.sum
})

// sequence accepts a collection of futures
// and returns a future of list of results
val finalFuture: Future[List[Int]] = Future.sequence(futures)
finalFuture.foreach(results => results.foreach(println))

55
155
255


In [10]:
//traverse
Future.traverse(ranges)(range => Future { range.sum} )

## Promise

* In promise, the result can be set explicitly unlike Future whose
result can only be set by the underlying task.
* Producer can do other useful stuff in the same future after setting the
promise.

* We can also have multiple futures trying to complete a single promise.
In such cases we need to use `trySuccess` method.

In [13]:
import scala.concurrent.Promise

def computeSum(input: Seq[Int]) = {
    // create a promise, use this promise 
    // within some future
    val p = Promise[Int]()
    Future {
        val sum = input.sum
        p.success(sum) // here we set the promise
        // a promise can exactly be set once
        // by calling either success or failure(ex)
    }
    // return a future instance from the promise
    // this future will be set when the above future
    // task sets the promise value
    p.future
}

val f = computeSum((1 to 100))
f.foreach(println)

* For IO intensive tasks, we should use `Executors.CachedThreadPool`
instead of global thread pool.