# üåä Reactive Streams & Async Patterns

**Phase 3: System Design - Event-Driven Architecture**

**Master event sourcing, CQRS, reactive streams, and asynchronous communication patterns**

---

In [None]:
// Reactive Streams Overview
println("üåä REACTIVE STREAMS & ASYNC PATTERNS - EVENT-DRIVEN SYSTEMS")
println()

println("üéØ Reactive Manifesto Principles:")
println("‚úì Responsive: Systems respond in timely fashion")
println("‚úì Resilient: Systems remain functional despite failures")
println("‚úì Elastic: Systems handle varying workloads")
println("‚úì Message-Driven: Async message passing for decoupling")
println()

println("üîß Core Reactive Patterns:")
println("‚úì Event Sourcing: State as series of events")
println("‚úì CQRS: Separate read/write models")
println("‚úì Reactive Streams: Non-blocking backpressure")
println("‚úì Event Streaming: Continuous event processing")
println("‚úì Lambda Architecture: Batch + speed layer integration")


## üì° Event Sourcing Architecture

**Complete state reconstruction from domain events - immutable audit logging**

In [None]:
// Event Sourcing Implementation
import cats.effect.{IO, Ref}
import cats.implicits._

// Domain Events
sealed trait DomainEvent {
  def aggregateId: String
  def sequenceNumber: Long
  def timestamp: java.time.Instant
}

case class AccountCreated(
  aggregateId: String,
  ownerId: String,
  currency: String,
  sequenceNumber: Long = 1L,
  timestamp: java.time.Instant = java.time.Instant.now()
) extends DomainEvent

case class FundsDeposited(
  aggregateId: String,
  amount: BigDecimal,
  reference: String,
  sequenceNumber: Long,
  timestamp: java.time.Instant = java.time.Instant.now()
) extends DomainEvent

case class FundsWithdrawn(
  aggregateId: String,
  amount: BigDecimal,
  reference: String,
  sequenceNumber: Long,
  timestamp: java.time.Instant = java.time.Instant.now()
) extends DomainEvent

// Current State (Projection)
case class BankAccount(
  id: String,
  ownerId: String,
  balance: BigDecimal,
  currency: String,
  version: Long = 0L
)

// Event Store
trait EventStore[F[_]] {
  def saveEvents(events: List[DomainEvent]): F[Unit]
  def getEvents(aggregateId: String): F[List[DomainEvent]]
  def getEventsSince(aggregateId: String, fromSequence: Long): F[List[DomainEvent]]
}

// In-memory event store implementation
class InMemoryEventStore[F[_]: Async] extends EventStore[F] {
  private val events = Ref.unsafe[F, Map[String, List[DomainEvent]]](Map.empty)
  
  def saveEvents(events: List[DomainEvent]): F[Unit] = {
    events.groupBy(_.aggregateId).toList.traverse { case (aggId, newEvents) =>
      events.modify { current =>
        val existing = current.getOrElse(aggId, Nil)
        val updated = existing ++ newEvents
        (current + (aggId -> updated), ())
      }
    }.void
  }
  
  def getEvents(aggregateId: String): F[List[DomainEvent]] =
    events.get.map(_.getOrElse(aggregateId, Nil))
    
  def getEventsSince(aggregateId: String, fromSequence: Long): F[List[DomainEvent]] =
    getEvents(aggregateId).map(_.filter(_.sequenceNumber > fromSequence))
}

println("üì° Event Sourcing Implemented")
println("‚Ä¢ Immutable event storage with replay capability")
println("‚Ä¢ Domain events representing state changes")
println("‚Ä¢ Sequence numbering for ordering guarantees")
println("‚Ä¢ Audit trail and debugging capabilities")
println()


## üîÑ CQRS Architecture Pattern

**Separate Command and Query responsibility for optimized read/write patterns**

In [None]:
// CQRS Implementation - Separate Read/Write Models

// Commands (Write Operations)
sealed trait AccountCommand {
  def accountId: String
}

case class OpenAccount(accountId: String, ownerId: String, currency: String) extends AccountCommand
case class DepositFunds(accountId: String, amount: BigDecimal) extends AccountCommand
case class WithdrawFunds(accountId: String, amount: BigDecimal) extends AccountCommand
case class CloseAccount(accountId: String) extends AccountCommand

// Queries (Read Operations)
sealed trait AccountQuery {
  def accountId: String
}

case class GetAccountBalance(accountId: String) extends AccountQuery
case class GetAccountHistory(accountId: String, since: java.time.Instant) extends AccountQuery
case class GetAccountSummary(accountId: String) extends AccountQuery

// Read Models (Projections)
case class AccountSummary(
  accountId: String,
  balance: BigDecimal,
  currency: String,
  ownerName: String,
  lastActivity: java.time.Instant
)

case class AccountActivity(
  accountId: String,
  eventType: String,
  amount: Option[BigDecimal],
  timestamp: java.time.Instant,
  reference: Option[String]
)

// CQRS System
trait CommandHandler[F[_]] {
  def handle(command: AccountCommand): F[Either[DomainError, List[DomainEvent]]]
}

trait QueryHandler[F[_]] {
  def handle(query: AccountQuery): F[Either[DomainError, Any]]
}

class CQRSBankAccountSystem[F[_]: Async](
  commandHandler: CommandHandler[F],
  queryHandler: QueryHandler[F],
  eventBus: EventBus[F]
) {

  def processCommand(command: AccountCommand): F[Either[DomainError, Unit]] =
    for {
      result <- commandHandler.handle(command)
      _ <- result.traverse(events => eventBus.publish(events))
    } yield result.map(_ => ())

  def executeQuery[A](query: AccountQuery): F[Either[DomainError, A]] =
    queryHandler.handle(query).map(_.map(_.asInstanceOf[A]))

  // Optimized read methods
  def getAccountBalance(accountId: String): F[Either[DomainError, BigDecimal]] =
    executeQuery[BigDecimal](GetAccountBalance(accountId))

  def getAccountSummary(accountId: String): F[Either[DomainError, AccountSummary]] =
    executeQuery[AccountSummary](GetAccountSummary(accountId))

  // Eventual consistency demonstration
  def getAccountHistory(accountId: String): F[Either[DomainError, List[AccountActivity]]] =
    executeQuery[List[AccountActivity]](
      GetAccountHistory(accountId, java.time.Instant.now().minusSeconds(300))
    )
}

sealed trait DomainError
case class InsufficientFunds(accountId: String, requested: BigDecimal, available: BigDecimal) extends DomainError
case class AccountNotFound(accountId: String) extends DomainError
case class InvalidAmount(amount: BigDecimal) extends DomainError

println("üîÑ CQRS Pattern Implemented")
println("‚Ä¢ Separate read/write models for optimization")
println("‚Ä¢ Commands for state-changing operations")
println("‚Ä¢ Queries for efficient data retrieval")
println("‚Ä¢ Event-driven communication between sides")
println()


## üåä Reactive Streams with Backpressure

**Non-blocking, asynchronous stream processing with demand signaling**

In [None]:
// Reactive Streams with Scala's fs2
import fs2.{Stream, Pipe, Pull}
import cats.effect.IO
import scala.concurrent.duration._

// Reactive stream processing pipeline
case class DataBatch(
  id: String,
  data: List[String],
  timestamp: java.time.Instant = java.time.Instant.now()
)

class ReactiveDataProcessor[F[_]: Async: Timer] {

  // Producer: Simulates data source with varying rates
  def dataSource: Stream[F, DataBatch] = {
    Stream.unfoldEval(0) { counter =>
      val batchSize = scala.util.Random.nextInt(100) + 50 // 50-150 items
      val data = (1 to batchSize).map(i => s"item_${counter}_$i").toList
      val batch = DataBatch(s"batch_$counter", data)
      
      for {
        _ <- Timer[F].sleep((scala.util.Random.nextInt(500) + 100).millis) // Random delay
      } yield Some((batch, counter + 1))
    }.take(20) // Finite stream
  }

  // Processing pipeline with backpressure
  def processingPipeline(input: Stream[F, DataBatch]): Stream[F, String] = {
    input
      . evalTap(batch => Async[F].delay(println(s"üì¶ Received batch: ${batch.id}")))
      . map(processBatch)                          // Transform
      . through(throttlingPipe)                   // Rate limiting
      . through(bufferWithBackpressure)          // Buffering
      . through(parallelProcessing(4))           // Parallel processing
      . through(errorRecovery)                    // Resilience
      . evalTap(result => Async[F].delay(println(s"‚úÖ Processed: $result")))
  }

  private def processBatch(batch: DataBatch): List[String] = {
    batch.data.map(item => s"processed_$item")
  }

  // Throttling with backpressure
  private def throttlingPipe: Pipe[F, List[String], List[String]] = {
    _.groupWithin(1000, 2.seconds)  // Max 1000 items or 2 seconds
      .map(_.flatten.take(1000))    // Limit batch size
  }

  // Buffer with backpressure signaling
  private def bufferWithBackpressure: Pipe[F, List[String], List[String]] = {
    _.bufferSliding(3)  // Buffer up to 3 elements, drop oldest when full
  }

  // Parallel processing with bounded parallelism
  private def parallelProcessing(parallelism: Int): Pipe[F, List[String], String] = {
    _.mapAsync(parallelism) { batch =>
      Async[F].delay {
        Thread.sleep(100) // Simulate processing time
        s"BATCH_SIZE_${batch.size}"
      }
    }
  }

  // Error recovery with fallback
  private def errorRecovery: Pipe[F, String, String] = {
    _.handleErrorWith { error =>
      Stream.eval(Async[F].delay(println(s"‚ùå Processing error: $error"))) *>
      Stream.emit("ERROR_RECOVERED")
    }
  }

  // Complete reactive pipeline
  def runPipeline(): Stream[F, Unit] = {
    processingPipeline(dataSource)
      .drain // Convert to Unit stream
      .onComplete(Stream.eval(Async[F].delay(println("üéØ Pipeline completed successfully"))))
  }
}

println("üåä Reactive Streams with Backpressure Implemented")
println("‚Ä¢ Non-blocking stream processing")
println("‚Ä¢ Automatic backpressure handling")
println("‚Ä¢ Throttling and rate limiting")
println("‚Ä¢ Parallel processing with bounded concurrency")
println("‚Ä¢ Error recovery and resilience")


## üì° Event Streaming with Kafka Patterns

**Distributed event streaming and real-time data processing**

In [None]:
// Event Streaming Architecture
println("üì° EVENT STREAMING & REAL-TIME PROCESSING")
println()

// Stream processing patterns
println("üî• Stream Processing Patterns:")
println()

println("1. üìä Windowing Operations")
println("   ‚úì Tumbling windows: Fixed-size, non-overlapping")
println("   ‚úì Sliding windows: Overlapping with fixed advances")
println("   ‚úì Session windows: Based on activity gaps")
println("   ‚úì Time-based windows: Calendar-aligned intervals")
println()

println("2. üîó Stream Joins")
println("   ‚úì Inner joins: Correlated events only")
println("   ‚úì Left joins: All left events with optional right")
println("   ‚úì Full outer joins: All events from both streams")
println("   ‚úì Interval joins: Events within time windows")
println()

println("3. üîÑ Stateful Operations")
println("   ‚úì Stateful aggregations with fault tolerance")
println("   ‚úì Checkpointing for recoverable state")
println("   ‚úì State stores with TTL expiration")
println("   ‚úì Interactive queries on stream state")
println()

println("4. üìà Event Enrichment")
println("   ‚úì Lookups in external data sources")
println("   ‚úì Joining with reference data")
println("   ‚úì Geolocation and entity resolution")
println("   ‚úì Real-time feature engineering")
println()

println("üèóÔ∏è Lambda Architecture Integration:")
println("‚Ä¢ Batch layer: Historical data processing")
println("‚Ä¢ Speed layer: Real-time event processing")
println("‚Ä¢ Serving layer: Unified data access")
println("‚Ä¢ Kappa architecture: Stream-first approach")
println()

println("‚ö° Reactive System Guarantees:")
println("‚Ä¢ At-least-once processing")
println("‚Ä¢ Exactly-once semantics")
println("‚Ä¢ Event time vs. processing time")
println("‚Ä¢ Out-of-order event handling")
println("‚Ä¢ Watermarking for completion")

println("\nThe key to reactive streams: handle data flow, not just individual elements!")
