# Asynchronous Programming in Scala

This notebook demonstrates various concepts of asynchronous programming in Scala, comparing them with Rust's approach where relevant.

## Key Concepts

- **Threads**: OS-level constructs for parallel execution
- **Futures**: Lightweight abstractions for asynchronous computations
- **Eager vs Lazy Execution**: How Scala differs from Rust
- **Error Handling**: Managing failures in asynchronous code
- **Thread Pools**: Configuring execution contexts

![Components](thread-components.png)

![Hierarchy](thread-task-hierarchy.png)

Hierarchy: see file async_hierarchy_panics.scala

Let's start by importing the necessary dependencies:

In [1]:
import sys.process._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import java.time.{Duration => JDuration, Instant}
import java.util.concurrent.{Executors, CopyOnWriteArrayList}
import scala.util.{Try, Success, Failure}

[32mimport [39m[36msys.process._
[39m
[32mimport [39m[36mscala.concurrent._
[39m
[32mimport [39m[36mscala.concurrent.duration._
[39m
[32mimport [39m[36mscala.concurrent.ExecutionContext.Implicits.global
[39m
[32mimport [39m[36mjava.time.{Duration => JDuration, Instant}
[39m
[32mimport [39m[36mjava.util.concurrent.{Executors, CopyOnWriteArrayList}
[39m
[32mimport [39m[36mscala.util.{Try, Success, Failure}
[39m

## 1. Eager vs Lazy Execution

One fundamental difference between Scala's `Future` and Rust's async/await is **execution behavior**:

- **Scala Futures are eager**: They start executing as soon as they're created
- **Rust futures are lazy**: They only start when explicitly awaited

**File: `await.scala`**

In [2]:
// EAGER FUTURES (DEFAULT IN SCALA)
def lionRuns(): Future[Unit] = Future {
  println("ü¶Å Lion is running!")
}

def foxRuns(): Future[Unit] = Future {
  println("ü¶ä Fox is running!")
}

def rabbitRuns(): Future[Unit] = Future {
  println("üêá Rabbit is running!")
}

println("ü¶Å lionRuns() is called.")
val lionFuture = lionRuns() // Execution begins immediately!
println("ü¶ä foxRuns() is called.")
val foxFuture = foxRuns()   // Execution begins immediately!
println("üêá rabbitRuns() is called.")
val rabbitFuture = rabbitRuns() // Execution begins immediately!
println("(All animals are running already in Scala Future!)")

// We only await some of the futures
Await.result(foxFuture, 2.seconds)
Await.result(lionFuture, 2.seconds)

// We never await the rabbit, but it still runs!
println("All animals have finished running (even the rabbit üêá, because Scala Future is eager)!")

ü¶Å lionRuns() is called.
ü¶ä foxRuns() is called.
ü¶Å Lion is running!
üêá rabbitRuns() is called.
ü¶ä Fox is running!
(All animals are running already in Scala Future!)
üêá Rabbit is running!
All animals have finished running (even the rabbit üêá, because Scala Future is eager)!


### Scala Futures: Key Patterns and Best Practices

In [2]:
import scala.concurrent.Future
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.concurrent.duration._

// 1. CREATING FUTURES
println("1. Creating a basic Future:")
val basicFuture: Future[Int] = Future {
  println("  ‚Üí Computing a result...")
  Thread.sleep(1000) // Simulate work
  42 // The result
}

// 2. CALLBACKS - Non-blocking way to handle results
println("\n2. Using callbacks (non-blocking):")
basicFuture.onComplete {
  case Success(value) => println(s"  ‚Üí Success callback: The result is: $value")
  case Failure(exception) => println(s"  ‚Üí Failure callback: An error occurred: ${exception.getMessage}")
}

// 3. TRANSFORMATIONS - Chaining operations
println("\n3. Transforming futures:")
val transformedFuture = basicFuture
  .map(result => result * 2)
  .map(doubled => s"The doubled result is: $doubled")

transformedFuture.onComplete {
  case Success(message) => println(s"  ‚Üí $message")
  case Failure(exception) => println(s"  ‚Üí Transform failed: ${exception.getMessage}")
}

// 4. COMPOSING FUTURES
println("\n4. Composing multiple futures:")
def getUsername(): Future[String] = Future { "user123" }
def getUserData(username: String): Future[Map[String, String]] = Future { 
  Map("name" -> "John Doe", "username" -> username) 
}

// Using flatMap for sequential composition
val userDataFuture = getUsername().flatMap(username => getUserData(username))

// The same thing using for-comprehension (more readable)
val userDataFuture2 = for {
  username <- getUsername()
  userData <- getUserData(username)
} yield userData

userDataFuture.onComplete {
  case Success(data) => println(s"  ‚Üí User data: $data")
  case Failure(e) => println(s"  ‚Üí Failed to get user data: ${e.getMessage}")
}

// 5. ERROR HANDLING
println("\n5. Error handling:")
val failingFuture: Future[String] = Future {
  throw new RuntimeException("Something went wrong!")
  "This will never be returned"
}

failingFuture
  .recover { case ex: RuntimeException => s"Recovered from: ${ex.getMessage}" }
  .onComplete {
    case Success(result) => println(s"  ‚Üí Recovery result: $result")
    case Failure(ex) => println(s"  ‚Üí Recovery failed: ${ex.getMessage}")
  }

// 6. AWAITING RESULTS (FOR DEMONSTRATION ONLY)
println("\n6. Awaiting results (avoid in production code):")
try {
  val result = Await.result(basicFuture, 2.seconds)
  println(s"  ‚Üí Await result: $result")
} catch {
  case e: TimeoutException => println("  ‚Üí Future timed out!")
  case e: Exception => println(s"  ‚Üí Future failed: ${e.getMessage}")
}

println("\n=== BEST PRACTICES ===")
println("‚Ä¢ Avoid blocking with Await.result in production code")
println("‚Ä¢ Use callbacks and transformations (map/flatMap) instead")
println("‚Ä¢ Handle errors explicitly with recover or recoverWith")
println("‚Ä¢ Use for-comprehensions for multiple sequential futures")
println("‚Ä¢ Consider custom ExecutionContexts for production apps")

// Sleep to ensure callbacks have time to execute
Thread.sleep(2000)

1. Creating a basic Future:

2. Using callbacks (non-blocking):
  ‚Üí Computing a result...

3. Transforming futures:

4. Composing multiple futures:

5. Error handling:
  ‚Üí User data: Map(name -> John Doe, username -> user123)

6. Awaiting results (avoid in production code):
  ‚Üí Recovery result: Recovered from: Something went wrong!
  ‚Üí Success callback: The result is: 42
  ‚Üí Await result: 42
  ‚Üí The doubled result is: 84

=== BEST PRACTICES ===
‚Ä¢ Avoid blocking with Await.result in production code
‚Ä¢ Use callbacks and transformations (map/flatMap) instead
‚Ä¢ Handle errors explicitly with recover or recoverWith
‚Ä¢ Use for-comprehensions for multiple sequential futures
‚Ä¢ Consider custom ExecutionContexts for production apps


## 2. Thread Pool Effects on Performance

Thread pool size affects how many CPU-bound tasks can execute in parallel. This example demonstrates the effect of different thread pool sizes.

**File: `async_threadpool.scala`**

In [4]:
// Define our CPU-bound task simulator
def cpuBoundTask(name: String): Unit = {
  println(s"$name starting work...")
  Thread.sleep(1000) // Simulate CPU-bound work
  println(s"$name finished!")
}

// Demo with 1 thread
def runWithThreads(threadCount: Int): Unit = {
  println(s"\n=== Running with $threadCount thread(s) ===\n")
  val start = Instant.now()
  
  // Create custom thread pool
  val executorService = Executors.newFixedThreadPool(threadCount)
  val ec = ExecutionContext.fromExecutorService(executorService)
  
  try {
    // Create three futures for our animals
    implicit val executionContext = ec
    
    val lion = Future {
      cpuBoundTask("ü¶Å Lion")
    }
    
    val fox = Future {
      cpuBoundTask("ü¶ä Fox")
    }
    
    val rabbit = Future {
      cpuBoundTask("üêá Rabbit")
    }
    
    // Wait for all to complete
    Await.result(Future.sequence(Seq(lion, fox, rabbit)), 10.seconds)
    
    val duration = JDuration.between(start, Instant.now()).toMillis / 1000.0
    println(s"\nAll animals finished! Total time with $threadCount thread(s): $duration seconds")
  } finally {
    executorService.shutdown()
  }
}

// Run with different thread pool sizes
runWithThreads(1)  // Sequential execution
runWithThreads(3)  // Parallel execution


=== Running with 1 thread(s) ===

ü¶Å Lion starting work...
ü¶Å Lion finished!
ü¶ä Fox starting work...
ü¶ä Fox finished!
üêá Rabbit starting work...
üêá Rabbit finished!

All animals finished! Total time with 1 thread(s): 3.012 seconds

=== Running with 3 thread(s) ===

ü¶Å Lion starting work...
ü¶ä Fox starting work...
üêá Rabbit starting work...
üêá Rabbit finished!
ü¶Å Lion finished!
ü¶ä Fox finished!

All animals finished! Total time with 3 thread(s): 1.009 seconds


defined [32mfunction[39m [36mcpuBoundTask[39m
defined [32mfunction[39m [36mrunWithThreads[39m

## 3. Deadlock Demonstration

This example shows a classic deadlock scenario where two threads each hold a resource the other needs.

**Note:** We won't actually run this in the notebook as it would deadlock. Instead, let's look at the code and understand it.

**File: `thread_deadlock.scala`**

In [5]:
// Print code only - don't run
println("""
import java.util.concurrent.locks.ReentrantLock

// Two locks representing resources
val meat = new ReentrantLock() // üçñ
val cheese = new ReentrantLock() // üßÄ

// Lion thread - grabs meat first, then tries to get cheese
val lionThread = new Thread(() => {
  meat.lock()
  println("ü¶Å Lion grabs the üçñ Meat!")
  Thread.sleep(100) 
  println("ü¶Å Lion wants the üßÄ Cheese...")
  cheese.lock() // This will block if Fox already has cheese
  println("ü¶Å Lion got the üßÄ Cheese too!")
  cheese.unlock()
  meat.unlock()
})

// Fox thread - grabs cheese first, then tries to get meat
val foxThread = new Thread(() => {
  cheese.lock()
  println("ü¶ä Fox grabs the üßÄ Cheese!")
  Thread.sleep(100)
  println("ü¶ä Fox wants the üçñ Meat...")
  meat.lock() // This will block if Lion already has meat
  println("ü¶ä Fox got the üçñ Meat too!")
  meat.unlock()
  cheese.unlock()
})

// Starting these creates deadlock:
// 1. Lion gets meat, Fox gets cheese
// 2. Lion waits for cheese (held by Fox)
// 3. Fox waits for meat (held by Lion)
// 4. Deadlock!
""")


import java.util.concurrent.locks.ReentrantLock

// Two locks representing resources
val meat = new ReentrantLock() // üçñ
val cheese = new ReentrantLock() // üßÄ

// Lion thread - grabs meat first, then tries to get cheese
val lionThread = new Thread(() => {
  meat.lock()
  println("ü¶Å Lion grabs the üçñ Meat!")
  Thread.sleep(100) 
  println("ü¶Å Lion wants the üßÄ Cheese...")
  cheese.lock() // This will block if Fox already has cheese
  println("ü¶Å Lion got the üßÄ Cheese too!")
  cheese.unlock()
  meat.unlock()
})

// Fox thread - grabs cheese first, then tries to get meat
val foxThread = new Thread(() => {
  cheese.lock()
  println("ü¶ä Fox grabs the üßÄ Cheese!")
  Thread.sleep(100)
  println("ü¶ä Fox wants the üçñ Meat...")
  meat.lock() // This will block if Lion already has meat
  println("ü¶ä Fox got the üçñ Meat too!")
  meat.unlock()
  cheese.unlock()
})

// Starting these creates deadlock:
// 1. Lion gets meat, Fox gets cheese
// 2. Lion waits for cheese 

## 4. Exception Handling in Future Hierarchies

This example demonstrates how exceptions propagate through hierarchies of futures and explores the concept of orphaned tasks.

**File: `async_hierarchy_panics.scala`**

In [6]:
// Here we'll run a simplified version to demonstrate the key concepts
def runAsyncHierarchyDemo(): Unit = {
  println("üåç World: Starting Hierarchy Demo")
  
  // Mammal branch with a bear that will fail
  val mammal = Future {
    println("  üêæ Mammal: Started")
    
    // The bear spawns fruit tasks then fails
    val bear = Future {
      println("    üêª Bear: Started")
      
      // Bear's child tasks
      val apple = Future {
        println("      üçé Apple: Started")
        Thread.sleep(500) // Long task
        println("      üçé Apple: Finished")
      }
      
      val banana = Future {
        println("      üçå Banana: Started")
        Thread.sleep(150)
        println("      üçå Banana: Finished")
      }
      
      // Bear panics before apple can finish
      Thread.sleep(100)
      println("    üêª Bear: Panicking now!")
      throw new RuntimeException("Bear panicked: bear hates apples!")
    }
    
    // Handle bear's exception
    try {
      Await.result(bear, 1.second)
      println("    üêª Bear: Finished successfully")
    } catch {
      case e: Exception => 
        println(s"    üêª Bear: Failed with: ${e.getMessage}")
    }
    
    println("  üêæ Mammal: Finished")
  }
  
  // Wait for the mammal branch to finish
  Await.result(mammal, 2.seconds)
  
  // Notice: We'll see the Banana finish, but the Apple might still be running
  // This demonstrates the "orphaned task" concept
  println("üåç World: Hierarchy Demo Completed")
  println("Note: The Apple task may continue running after its parent (Bear) failed!")
}

runAsyncHierarchyDemo()

üåç World: Starting Hierarchy Demo
  üêæ Mammal: Started
    üêª Bear: Started
      üçé Apple: Started
      üçå Banana: Started
    üêª Bear: Panicking now!
    üêª Bear: Failed with: Bear panicked: bear hates apples!
  üêæ Mammal: Finished
üåç World: Hierarchy Demo Completed
Note: The Apple task may continue running after its parent (Bear) failed!


defined [32mfunction[39m [36mrunAsyncHierarchyDemo[39m

## 5. Best Practices: Future[Either] Pattern

This example demonstrates recommended patterns for working with Scala's Futures, focusing on proper error handling with the `Future[Either[Error, Success]]` pattern.

**File: `future.scala`**

In [7]:
// Domain classes for our example
case class Animal(name: String, species: String)
case class Diet(foodType: String, dailyAmount: Int)

// Error model with sealed trait
sealed trait ZooError {
  def message: String
}
case class DatabaseError(message: String) extends ZooError
case class ValidationError(message: String) extends ZooError

// Demo of the Future[Either] pattern
def demoFutureEither(): Unit = {
  println("\n=== Future[Either] Best Practice Demo ===\n")
  
  // Step 1: Service returning Future[Either]
  def findAnimal(name: String): Future[Either[ZooError, Animal]] = Future {
    println(s"üîç Searching for animal: $name")
    
    name match {
      case "lion" => Right(Animal("Leo", "Lion"))
      case "tiger" => Right(Animal("Tigra", "Tiger"))
      case _ => Left(ValidationError(s"Unknown animal: $name"))
    }
  }
  
  // Step 2: Another service returning Future[Either]
  def getDiet(animal: Animal): Future[Either[ZooError, Diet]] = Future {
    println(s"ü•© Fetching diet for: ${animal.name} the ${animal.species}")
    
    animal.species match {
      case "Lion" => Right(Diet("Meat", 10))
      case "Tiger" => Right(Diet("Meat", 8))
      case _ => Left(ValidationError(s"No diet info for ${animal.species}"))
    }
  }
  
  // Step 3: Combining futures with for-comprehension
  def getAnimalWithDiet(name: String): Future[Either[ZooError, (Animal, Diet)]] = {
    for {
      // Find animal
      animalResult <- findAnimal(name)
      
      // If animal found, get diet (otherwise short-circuit)
      dietResult <- animalResult match {
        case Right(animal) => getDiet(animal).map(diet => diet.map(d => (animal, d)))
        case Left(error) => Future.successful(Left(error))
      }
    } yield dietResult
  }
  
  // Test with successful case
  val lionResult = getAnimalWithDiet("lion")
  Await.result(lionResult, 2.seconds) match {
    case Right((animal, diet)) => 
      println(s"‚úÖ Success: ${animal.name} the ${animal.species} eats ${diet.foodType}")
    case Left(error) => 
      println(s"‚ùå Error: ${error.message}")
  }
  
  // Test with error case
  val unicornResult = getAnimalWithDiet("unicorn")
  Await.result(unicornResult, 2.seconds) match {
    case Right((animal, diet)) => 
      println(s"‚úÖ Success: ${animal.name} the ${animal.species} eats ${diet.foodType}")
    case Left(error) => 
      println(s"‚ùå Error: ${error.message}")
  }
}

demoFutureEither()

      üçå Banana: Finished
      üçé Apple: Finished

=== Future[Either] Best Practice Demo ===

üîç Searching for animal: lion
ü•© Fetching diet for: Leo the Lion
‚úÖ Success: Leo the Lion eats Meat
üîç Searching for animal: unicorn
‚ùå Error: Unknown animal: unicorn


defined [32mclass[39m [36mAnimal[39m
defined [32mclass[39m [36mDiet[39m
defined [32mtrait[39m [36mZooError[39m
defined [32mclass[39m [36mDatabaseError[39m
defined [32mclass[39m [36mValidationError[39m
defined [32mfunction[39m [36mdemoFutureEither[39m

## 6. Running External Scala Files

We can also run our complete Scala examples using shell commands from the notebook:

In [8]:
// Run the await.scala example (if you want to run it externally)
// Uncomment to execute

"scala await.scala". !!

[36mres8[39m: [32mString[39m = [32m"""ü¶Å lionRuns() is called.
ü¶ä foxRuns() is called.
ü¶Å Lion is running!
üêá rabbitRuns() is called.
ü¶ä Fox is running!
(All animals are running already in Scala Future!)
üêá Rabbit is running!
All animals have finished running (even the rabbit üêá, because Scala Future is eager)!

--- Lazy version ---
ü¶Å lazyLionRuns() is called.
ü¶ä lazyFoxRuns() is called.
üêá lazyRabbitRuns() is called.
(No animals are running yet...)
ü¶ä Fox is running!
ü¶Å Lion is running!
All animals have finished running (except the rabbit üêá, who never started)!
"""[39m

## Key Differences: Scala vs Rust Async

| Aspect | Scala Future | Rust Async/Await |
|--------|-------------|------------------|
| Execution Model | **Eager**: Starts executing as soon as created | **Lazy**: Only starts when awaited |
| Type System | Uses `Future[T]` for all async results | Uses `impl Future<Output=T>` and other future types |
| Error Handling | Usually with `Try`, `Either`, or exceptions | With `Result<T, E>` return types |
| Cancellation | No built-in cancellation | Dropping a future can cancel it |
| Composition | `map`, `flatMap`, for-comprehensions | Async blocks, `.await` syntax |
| Runtime | Requires an `ExecutionContext` | Requires a runtime like Tokio |
| Resource Efficiency | Each future is a lightweight task | Zero-cost abstraction with state machines |

## Learning Resources

* [Introduction to scala-async](https://www.baeldung.com/scala/scala-async)

* [Working with Futures in Scala: A Quick Introduction](https://towardsdev.com/working-with-futures-in-scala-a-quick-introduction-9223703ab25e)

* [Synchronous Handling of Futures](https://www.baeldung.com/scala/synchronous-handling-of-futures)

### Sync, Async, Blocking and Non-Blocking | Rock the JVM

[![Sync, Async, Blocking and Non-Blocking | Rock the JVM](https://img.youtube.com/vi/Hlu-zYeNsSU/0.jpg)](https://www.youtube.com/watch?v=Hlu-zYeNsSU)

[Watch on YouTube](https://www.youtube.com/watch?v=Hlu-zYeNsSU)

---

### Adam Rosien - Async/Await for the Monadic Programmer | Scala Days 2023 Seattle

[![Adam Rosien - Async/Await for the Monadic Programmer | Scala Days 2023 Seattle](https://img.youtube.com/vi/OH5cxLNTTPo/0.jpg)](https://www.youtube.com/watch?v=OH5cxLNTTPo)

[Watch on YouTube](https://www.youtube.com/watch?v=OH5cxLNTTPo)

---

### Scala Programming - Introduction to Threads and Futures

[![Scala Programming - Introduction to Threads and Futures](https://img.youtube.com/vi/6b24sszy6Js/0.jpg)](https://youtu.be/6b24sszy6Js)

[Watch on YouTube](https://youtu.be/6b24sszy6Js)