Skip to content

Latest commit

 

History

History
1404 lines (1004 loc) · 59.9 KB

18.Scala Futures和Akka Actors的并发.md

File metadata and controls

1404 lines (1004 loc) · 59.9 KB

18. Scala Futures和Akka Actors的并发

在Scala中,开发人员仍然能使用Java线程:

    val thread = new Thread {
        override def run =
            // put your long-running code here ...
            Thread.sleep(100)
            println("Hello, world")
    }
    thread.start

然而,FutureActor模型是实现并发的更好方式:

Futures

    适合于处理一次性问题:“处理相对缓慢,且可能长时间运行的计算问题,并且完成后通知结果”。

Actors

    适合于处理运行在并行中、存活时间长,且在生命周期内可能对多次请求进行响应。

FuturesActors都可以在比线程更高抽象级别上编码。你一旦适应了,便可以专注于解决手头问题,而不必担心低级别问题,如线程、锁和共享数据。

Akka和Scala 3 TODO(鸽子栏)

     正文在编写本文时,Akka还没有移植到Scala 3。因此,本章所有示例均采用最新版本的Scala 2。

Futures

Future Scaladoc( https://oreil.ly/Acilt )表示:“ Future 代表一个当前不确定是否可用的值,该值将在未来的某个时候可用,但如果发生异常则不可用”。

Scala Future在以下几个方面做了比Java Thread更好的改进:

  • Thread的典型用法一样,当想要创建一些“口袋并发”且并行运行一个相对较短的任务可以使用Future
  • 当一个 Future 完成时,被称为completed,当完成时,你可以使用许多不同的回调和转换方法处理其结果,包括onCompleteandThenforeachmaprecoverWith

Akka和Actor模型

Akka是一个面向Scala和Java程序员的Actor模型库。这本书第一版涵盖了现在被称为Akka Classic actor。Classic使用Untyped actors。我在2013年刚开始使用时就发现它们很容易上手。而Akka至今还提供Classic actor的支持。

如今一种新的方式被称为Akka Typed,顾名思义,这些actor比Classic actor要更加类型安全,有助于消除编译时错误 。本书这版涵盖了Akka Typed

Actor模型

在本章深入研究Akka技巧之前,先了解一下Actor模型。首先要了解的是actor的概念:

  • actor是构建基于Actor系统的最小单元,如同面向对象系统中的类。
  • 如同一个类一样,一个actor封装了状态和行为(behavior)。
  • 不能通过进入一个actor内部来获取其状态,但可以向actor发送消息获取状态。就像问一个人感觉如何,但是你无法进入并执行其中一个方法,或访问其字段。
  • actor拥有信箱 —— 称之为收件箱 —— 它在生命周期中的目的是处理信箱中的消息。
  • 可以通过向actor发送一个不可变消息来与它通信。这就像你给某人发送一份邮件,然后邮件会进入他的邮箱,而actor发送的消息会进入到接收者actor的信箱。
  • 当actor收到一条消息时,就像你从信箱里取出一封信。actor会打开信并处理这条消息,然后继续处理信箱中的下一封信(消息)。当没有要处理的消息时,actor会等待直到再次收到消息。

在应用程序中,actor形成层次结构,类似家庭或公司。lightbend公司是Akka的创造者和主要维护者。建议将actor视为一个个体,例如公司中的一个个体:

  • 一个actor有一个父级(监督): 用于创建actor。
  • 一个actor可能有子级。就像一个公司,董事长下面会有一些副总裁。那些副总裁也会有很多下属一样。
  • 每个actor当然可能有同级的actor。比如说,公司董事长可能直接带着10个副总裁。

开发actor系统的最佳实践是委派,委派,委派。尤其当行为会阻塞时候。在公司里面,董事长可能想完成一些事情,所以他将工作委派给副总裁,副总裁委派给管理者等等。直到这项工作最终由一个或多个员工完成。

委派很重要。设想这项工作需要几个人/年的时间,如果董事长必须自己处理这项工作,他就不能处理其他工作,副总裁和其他员工都会无所事事。

除了这些关于actor的概述之外,对Akka实现的Actor模型还有一些重要的事情需要知道:

  • 你不能直接进入到Akka actor去获取当前状态的信息。当代码在实例化一个Actor的同时, Akka会提供相对应的ActorRef,这本质上是你和actor之间唯一沟通渠道。
  • 而事实上,Akka actor会运行在真实线程上,许多actor甚至有可能共享同一个线程。
  • 可选择不同的信箱实现( https://oreil.ly/Ez66Z ),也可以选择创建自己的信箱类型。
  • 当actor(有意或无意)终止时,其信箱中的消息进入系统的“死信信箱”,如18.8小节所述。

Actor的好处

时至今日,Actor模型已经由其他语言实现过了,如Erlanghttps://www.erlang.org/ )和Darthttps://dart.dev/ ),并且能提供长时间运行的并发实现和高级抽象的并行。除此之外,还有Akka actor库还增加了以下好处:

轻量级,事件驱动的进程

    文档表明内存中每GB大约有250万个actor,每秒可以处理5000万条消息。

容错

    Akka actor可用来创建“自愈系统”。

位置透明性

    Akka actor可以跨越多个jvm和服务器;它们被设计在分布式环境使用纯消息传递的。

“高级别的抽象”也可以认为“易于使用”。不需要花很长时间来理解Actor模型,一旦你使用了,能够编写复杂的并发应用程序,这比使用Java基本库要容易得多。编写actor如同在真实世界建模,所以你可以为披萨店编写actor做披萨,另一个接受订单,另一个配送订单等等。

而我通常认为actor的使用,就像人类的活动一样相对独立,我喜欢把它们看作我无法控制的 —— 他人编写的web服务。我可以向那个web服务发送请求信息,但我不能直接进入web服务来修改它的状态或访问它的资源; 我只能通过API工作,就好像向actor发送不可变消息一样。

希望这些关于Actor模型的说明,特别是关于Akka实现的,能有助于你理解这小节。

还有一件事:并行集合类

Scala并行集合类( https://oreil.ly/JVN7v )曾经是Scala发行版中比较重要的组成部分,但现在可作为单独的项目使用。

这个来自本书第一版的例子让你了解并行集合类是如何工作的:

    import scala.collection.parallel.immutable.ParVector
    val v = ParVector.range(0, 10)                // ParVector(0,1,2,3,4,5,6,7,8,9)
    v.foreach { e => Thread.sleep(10); print(e) }  // 0516273849

foreach输出所示,因为像ParVector这样的集合是真正以并行的方式实现的。即使是像foreach这样简单的方法,它们的方法的输出也是不确定的。要了解更多信息,可参考我的博客“如何在Scala中使用并行集合的例子( https://oreil.ly/FdGxW )”。

并行与并发 -- TODO(鸽子栏)

       关于并发和并行术语的含义,有着不少有趣的争论。我个人倾向于使用界线不那么严谨。有一个关于它们明显差异点的有趣讨论如下:并发是一台自动售货机有两条生产线,而并行是两台自动售货机有两条生产线,参见Yossi Kreinin发布的博客“并行和并发需要不同的工具( https://oreil.ly/A9mij )”。

18.1 创建Future

问题

你希望使用简单的方式来用Future执行并行任务,并且阻塞应用程序线程,直到任务完成。

解决方案

Future提供了一种并发运行算法的简单方法。当Future创建时开始并行运行并且在将来某个点返回结果。在Scala中,一个Future最终会返回结果。

下面的例子展示了如何创建一个Future,然后阻塞等待结果。在编写并行算法时阻塞不是一件好事 —— 仅当在万不得已的时候使用阻塞。但这作为一个初学例子很有用,因为它更容易思考,其次是因为它把不好的东西提前解决了。

这段代码在Future的某个时候执行1+1的计算。当计算完成时,返回结果:

    // 1 - the necessary imports
    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration.*
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.Random
    import Thread.sleep
    
    @main def futures1 =
        // 1: create a Future that runs in a separate thread and
        // returns “eventually”
        val f = Future {
            // this could be any long-running algorithmsleep(Random.nextInt(500))
            1 + 1
        }
        
        // 2: this is blocking, i.e., pausing the current thread to wait for a
        // result from another thread
        val result = Await.result(f, 1.second)
        println(result)
        sleep(1_000)

下面是这段代码的工作过程:

  • import语句将代码导入所需的作用域。
  • ExecutionContext.Implicits.global import语句导入“默认全局执行上下文”,可以将execution context看作是线程池,这是访问线程池的一种简单方法。
  • 在第一个注释之后创建一个Future。可见构造Future很简单,只需要传入你想要运行的代码块。代码将并行执行且在未来的某个时刻返回结果。
  • Await.result方法调用声明在Future返回前将等待1秒。如果在指定时间没有返回,将抛出java.util.concurrent.TimeoutException异常。
  • 代码最后使用sleep语句,因此当Future结束计算时候程序将会继续运行。在实际程序中不需要这样,但是这样的小例子中,必须保持JVM main线程运行。

值得说明的是阻塞是不好的。万不得已不要写这样的代码。下面小节中的例子展示了更好的方式。

如果Future的等待时间超过指定时间,将得到像这样的异常:

    java.util.concurrent.TimeoutException: Future timed out after [1 second]

你可以自己尝试将例子中的Random.nextInt(500)修改为2_000 之类的值。

讨论

虽然Future使用很简单,但背后有很多概念。下面描述了Future的基本概念以及Future所依赖的 ExecutionContext

  • Scala官方网站上的FuturePromises页面( https://oreil.ly/fueUh )将Future定义为“一个为不存在的结果创建的只读占位符对象的类型”。
  • 与保存 Some[A]None 的容器 Option[A] 类似,Future[A] 是并行执行计算并且在将来某些时间返回结果的容器。如返回(a) a类型的结果或(b)异常。
  • Future创建后算法在未来之后的某个不确定时间开始运行,在执行上下文中分配的线程上运行。
  • 一旦Future完成,计算的结果就可用了。
  • 当返回一个结果时,一个Future被称为completed。也可能是successfully completedfailed
  • 如下面几个小节所示,Future提供了一个 API 用于读取已计算的值,包括回调和转换方法如foreachonCompletemap等。for也在18.4小节中所使用。
  • ExecutionContext执行给定的任务,你可以当作成线程池一样。

在我的代码中使用了ExecutionContext导入语句提供了默认的全局执行上下文:

    import scala.concurrent.ExecutionContext.Implicits.global

在审查这本书的时候,Hermann Hueck指出,有很多方法可以导入和使用一个ExecutionContext。例如,给这个初始import语句:

    import scala.concurrent.ExecutionContext

在代码中有多种不同的方式提供ExecutionContext

    // define a given with name 'ec'
    given ec: ExecutionContext = ExecutionContext.global

    // for this example we just need the type; the name is not relevant
    given ExecutionContext = ExecutionContext.global

    // imports all givens in Implicits
    import ExecutionContext.Implicits.given

    // imports the given of the type ExecutionContext
    import ExecutionContext.Implicits.{given ExecutionContext}

了解所有这些方法对灵活使用ExecutionContext很有帮助。

另见

  • Scala官方网站上的Futures和Promises页面( https://oreil.ly/fueUh )包括Futures(我没有涉及Promises)有许多例子我没有深入。
  • Scala文档:scala.concurrent.ExecutionContexthttps://oreil.ly/X9bpJ )。

18.2 使用Future的回调和转换方法

问题

你希望并行运行一个任务,当任务完成时需要有不同的方式来处理结果。

解决方案

前面的小节展示了使用Future的简单方法,但是因为这个示例是阻塞的,所以尽量少直接使用刚才展示的技巧。这个小节中会展示更好的方法。

通用代码

为了简化后面代码,请注意以下所有例子都依赖于这些导入语句:

    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success, Random}
    import Thread.sleep

解决方案1:使用onComplete

Future拥有三个回调方法:onComplete, andThen, 和 foreach. 下面的例子展示了onComplete

    def getMeaningOfLife() =
        sleep(Random.nextInt(500))
        42

    @main def callbacks1 =

        println("starting calculation ...")
        val f = Future {
            getMeaningOfLife()
        }
        println("before onComplete")
        f.onComplete {
            case Success(value) => println(s"Got the callback, meaning = $value")
            case Failure(e) => e.printStackTrace
        }

        // do the rest of your work
        println("A ..."); sleep(100)
        println("B ..."); sleep(100)
        println("C ..."); sleep(100)
        println("D ..."); sleep(100)
        println("E ..."); sleep(100)
        println("F ..."); sleep(100)

        sleep(2_000)

这个例子在随机延迟之后返回getMeaningOfLife值42。重要的部分是f.oncomplete方法调用及其后面的代码。下面是代码的工作原理:

  • Future f被创建后会尽可能快的运行(尽管开始运行的实际时间是不确定的)。
  • f的类型是Future[Int]
  • f.oncomplete方法设置了回调,当Future完成时候将会发生调用。
  • onComplete的类型签名表明它接受一个函数来转换Try输入参数到Unit结果。因此在 f.onComplete 代码块中,代码处理Future的结果是SuccessFailure
  • 稍有延迟的println语句代表代码的其余工作可能在主线程上执行,而Future在并行的线程上关闭或者运行。

因为Future在其他线程上并发运行,但是无法精确知道何时计算出结果,所以代码的输出是不确定的,看起来像这样:

    starting calculation ...
    before onComplete
    A ...
    B ...
    C ...
    D ...
    E ...
    Got the callback, meaning = 42
    F ...

因为Future最终会返回 —— 在某个不确定的时间 —— “得到回调”消息可能出现在输出的任何位置。

如前所述,onComplete类型签名表明它接受一个函数转换Try参数:

    def onComplete[U](f: (Try[T]) => U)(implicit executor: ExecutionContext): Unit

因此,这里可以使用另一种方式是在Try参数上调用fold来替换前面f.onComplete的代码:

    f.onComplete (_.fold(
    		_.printStackTrace,
            value => println(s"Got the callback, meaning = $value")
        )
    )

在这个例子中,fold接受两个参数:

  • 第一个参数是在Try为失败时应用的函数。
  • 第二个参数是在TrySuccess时应用的函数。

有关这种方法的更多细节,请参考Scaladoc的Try类( https://oreil.ly/3SKN3 )。

解决方案2:使用andThen或foreach

有时候,onComplete并不是你想要的,在这种情况下可以使用回调方法andThenforeach。下面是如何使用andThen的例子:

    @main def callbacks2 =
        println("Creating the future")
        val f: Future[Int] = Future {
            // sleep for a random time before returning 42
            val sleepTime = Random.nextInt(500)
            sleep(sleepTime)
            println("Leaving the future")
            if sleepTime > 250 then throw new Exception("Ka-boom")
            42
        }
        
        // handle the result of f with andThen
        println("Before andThen")
        f andThen {
            case Success(x) =>
                val y = x * 2
                println(s"andThen: $y")
            case Failure(t) =>
                println(s"andThen: ${t.getMessage}")
        }
        println("After andThen")
        sleep(1_000)

这段代码类似于onComplete的例子,但有以下变化:

  • Future代码块大约有一半的时间被会抛出异常。如果不抛出异常,最终会得到值42
  • andThen代码块在Future完成时候运行。
  • andThen代码块用偏函数实现的。如果不想实现case语句的Failure部分,则不必这样做。

当一个异常抛出时,应用程序的输出是:

    Creating the future
    Before andThen
    After andThen
    Leaving the future
    andThen: Ka-boom

当没有异常抛出时,输出是:

    Creating the future
    Before andThen
    After andThen
    Leaving the future
    andThen: 84 

接下来,这是前一个例子的简短版本,使用foreach而不是andThen

    @main def callbacks3 =
    
        val f: Future[Int] = Future {
        val sleepTime = Random.nextInt(500)
        sleep(sleepTime)
        if sleepTime > 250 then throw new Exception("Ka-boom")
        42
    }
    
    f.foreach(println)

    sleep(1_000)

在本例中,没抛出异常时输出42,抛出异常时候什么也不输出。Future类的Scaladoc告诉我们为什么什么也没有输出:“警告:(foreach)将不会被调用,如果这个Future永不完成或者以失败完成。因为这个方法是异步执行且不产生返回值,抛出任何非致命异常将报告给ExecutionContext”。

Future还有更多属于转换类的回调方法,包括transformcollectfallbackTomaprecoverrecoverWith。下面是使用fallbackTo的一个简短例子:

    def getMeaningOfLife() = Future {
        sleep(Random.nextInt(500))
        42
    }
    
    val meaning = getMeaningOfLife() fallbackTo Future(0)
    meaning.foreach(println)

Future类( https://oreil.ly/Acilt )的Scaladoc为其它转换方法提供了很好的例子。

讨论

下面的声明描述了Future的使用可以和回调和转换方法一起使用:

  • Future完成时回调和转换方法被异步调用。
  • 本小节展示了onCompleteandThenforeach、和fallbackTo
  • Future完成一段时间后由某个线程执行回调方法。在Scala官方网站的Futures和Promises页面( https://oreil.ly/fueUh )上写道:“没有保证回调函数被完成Future的线程调用或创建回调的线程所调用”。
  • 回调的执行顺序不能保证。
  • onComplete接收类型为Try[a] =>B的回调函数。
  • andThen接收偏函数,只需要处理所需的情况。(见10.7小节,”创建偏函数“,以获取更多信息)
  • onCompleteforeach的返回类型是Unit,所以它们不能被链接在一起。

18.3 编写返回Future的方法

问题

你想要写一个返回Future的方法或函数。

解决方案

在实际应用中,需要创建返回Future的方法。以下例子定义了名为longRunningComputation的方法,该方法返回Future(Int)

    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success, Random}
    import Thread.sleep
    
    @main def futuresFunction =
    
        // a function that returns a Future
        def longRunningComputation(i: Int): Future[Int] = Future {
            sleep(100)
            i + 1
        }
        
        // this does not block
        longRunningComputation(11).onComplete {
            case Success(result) => println(s"result = $result")
            case Failure(e) => e.printStackTrace
        }
        
        // keep the jvm from shutting down
        sleep(1_000)

在这个例子中,将longRunningComputation的方法体创建在Future包装的代码块中。代码块传递给Future对象的apply方法。这会启动异步计算并返回Future[A],这个例子将计算结果保存到Future[Int] 中。这是定义方法返回Future的常用方法。

讨论

在类似的技术中,因为Future将代码块作为其输入参数,可以将已有方法中未并行执行的包装在Future内部:

    // some existing function that does not run concurrently
    def getMeaningOfLife() = ???
    
    // wrap that existing function in a Future
    val meaning = Future { getMeaningOfLife() }

同样,这种方式生效是因为:

  • 该方法传递给scala.concurrent.Future中的apply方法(Future对象,而不是Future类)。

  • apply方法接受传名调用代码作为参数,可以从Future对象的Scaladoc页面( https://oreil.ly/VmJE7 )上的签名可以看出:

    final def apply[T](body: => T)(implicit executor: ExecutionContext): Future[A]
  • 因为方法(或函数)等价于传名调用,如上所示,我们可以用Future包装这个非并发方法。

18.4 并行运行多个Future

问题

Future通常在创建后就开始运行,你想看看如何并行运行多个Future,并当它们全部完成时,将结果合并在一起。

解决方案

如果想创建多个Scala future并在for表达式中将它们的结果合并在一起获得结果,正确的方式是:

  1. 创建多个Future
  2. for表达式中合并结果。
  3. 使用onComplete或类似的技术获取合并的结果。

正确的方式(简化)

我将在下面代码展示for表达式中使用多个Future的正确方式。最重要的是按照步骤A创建Future,在步骤B的for表达式中使用它们:

    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    import Thread.sleep
    
    @main def multipleFutures1 =
    
        // (a) create the futures
        val f1 = Future { sleep(800); 1 }
        val f2 = Future { sleep(200); 2 }
        val f3 = Future { sleep(400); 3 }
        
        // (b) run them simultaneously in a for-comprehension
        val result = for
            r1 <- f1
            r2 <- f2
            r3 <- f3
        yield (r1 + r2 + r3)
        
        // (c) process the result
        result.onComplete {
            case Success(x) => println(s"result = $x")
            case Failure(e) => e.printStackTrace
        }
        
        // important for a little parallel demo: keep the jvm alive
        sleep(3_000)

一个用于验证的完整示例

无法通过阅读代码来判断这种方式是否正确,所以创建了下面例子来展示它是如何工作的:

    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    import Thread.sleep

    def slowlyDouble(
        x: Int,
        startTime: Long,
        delay: Int,
        name: String
    ): Future[Int] = Future {
        println(s"entered $name: ${delta(startTime)}")
        sleep(delay)
        println(s"leaving $name: ${delta(startTime)}")
        x * 2
    }
    
    // time-related functions that are used in the code
    def delta(t: Long) = System.currentTimeMillis - t
    def time() = System.currentTimeMillis

    @main def multipleFutures2 =
    
        val t0 = System.currentTimeMillis
    
        // Future #1
        println(s"creating f1: ${delta(t0)}")
        val f1 = slowlyDouble(x=1, t0, delay=1500, name="f1")
    
        // Future #2
        sleep(100)
        println(s"\ncreating f2: ${delta(t0)}")
        val f2 = slowlyDouble(x=2, t0, delay=250, name="f2")
    
        // Future #3
        sleep(100)
        println(s"\ncreating f3: ${delta(t0)}")
        val f3 = slowlyDouble(x=3, t0, delay=500, name="f3")
        println(s"\nentering `for`: ${delta(t0)}")
    
        val result = for
            r1 <- f1
            r2 <- f2
            r3 <- f3
        yield (r1 + r2 + r3)
    
        println("\nBEFORE onComplete")
        result.onComplete {
            case Success(x) => {
                println(s"\nresult = $x (delta = ${delta(t0)})")
                println("note: you don’t get the result until the last future completes")
            }
            case Failure(e) => e.printStackTrace
        }
        println("AFTER onComplete\n")
    
        // important for a little parallel demo: keep the jvm alive
        sleep(3_000)

如果运行这段代码,会看到这样的输出:

    creating f1: 0
    entered f1: 0

    creating f2: 105
    entered f2: 105

    creating f3: 210
    entering `for`: 211
    entered f3: 211

    BEFORE onComplete
    AFTER onComplete

    leaving f2: 359
    leaving f3: 713
    leaving f1: 1501

    result = 12 (delta = 1502)
    note: you don’t get the result until the last future completes

输出显示了几个有趣的点:

  • f1f2f3立即开始运行。虽然从这段代码看不出来,但是会立即在新线程上开始运行。
  • 输出快速传递到onComplete语句。
  • 在短暂的停顿之后,leaving语句被打印,紧接着是结果。
  • 注意到f1打印结果的delta仅比delta多一点,那是因为f1的睡眠时间最长。 这是有道理的,三个future并行运行的时间只是运行最长的future时间久一点。

我鼓励你多使用这段代码,甚至当做自己的日常代码来使用,直到完全理解Scala的Future是如何使用for表达式达成这里的效果为止。

讨论

也可以通过执行错误的操作来确认之前的方法是正确的。给定和前面例子同样的包导入以及slowDoubledeltatime 方法。以下代码展示了在 for 表达式中用错误的方式使用Future

    @main def multipleFuturesWrong =

        val t0 = System.currentTimeMillis

		// WARNING: THIS IS THE INTENTIONALLY WRONG APPROACH
        println(s"\nentering `for`: ${delta(t0)}")
        val result = for
            r1 <- slowlyDouble(x=1, t0, delay=1500, name="f1")
            r2 <- slowlyDouble(x=2, t0, delay=250, name="f2")
            r3 <- slowlyDouble(x=3, t0, delay=500, name="f3")
        yield (r1 + r2 + r3)

        println("\nBEFORE onComplete")
        result.onComplete {
            case Success(x) => {
                println(s"\nresult = $x (delta = ${delta(t0)})")
                println("note that you don't get the result until the last future 						completes")
            }
            case Failure(e) => e.printStackTrace
            }
        println("AFTER onComplete\n")

        // important for a little parallel demo: keep the jvm alive
        sleep(3_000)

当运行这段代码时,你会看到类似这样的输出:

    entering `for`: 0
    entered f1: 1

    BEFORE onComplete
    AFTER onComplete

    leaving f1: 1503
    entered f2: 1503
    leaving f2: 1758
    entered f3: 1758
    leaving f3: 2260

    result = 12 (delta = 2261)

输出显示了:

  • f1 future快速进入(在delta=1时)。
  • f1 future在1500毫秒后退出。
  • f2是在f1完成之后那进入,并且在超过250ms的时间内没有退出。
  • f2完成后,f3进入,并且代码又暂停了500多毫秒。

这表明 f1f2f3 不是并行运行,而是一个在另一个之后的串行运行。需要明确的是,这是错误的,并不是想要的。

警告:Future不是引用透明的

如果对函数式编程感兴趣,关键要知道Scala的Future不是引用透明的,因此不适合 FP。因为Future 是eager的并且立即开始运行,因此不能重构代码来用结果替换表达式(反之亦然)。

如果对编写类似Future代码的函数方式感兴趣,请参阅 ZIO 库( https://zio.dev/ )和 Cats Effect IO monad —— 在 Typelevel Scala 博客文章“Concurrency in Cats Effect 3”( https://oreil.ly/mD0Jv )有讨论 —— 以及 Monix task,因为他们编写函数式、惰性、异步代码。

18.5 创建面向对象风格的Actor

问题

从本章节的介绍中,你已经知道可以创建OOP(面向对象)或FP(函数式)风格的Akka Typed actor,现在你想学习并理解OOP风格。

解决方案

OOP解决方案是类和伴生对象的组合:

  • 在对象中定义 (a) actor可以处理的消息和 (b) apply工厂方法。
  • 该类继承了Akka AbstractBehavior 类并实现了onMessage 方法。
  • 该类被定义为私有的,因此无法访问其构造函数,调用者必须使用对象的 apply 方法。

这是一个名为Tom的对象示例,它基于电影50 First Dates中的 Tom角色。定义了一个 Message trait和以及扩展它的Hello样例对象。 基于这些定义和类中展示的match表达式,Hello 是这个actor可以唯一处理的消息。

sealed trait的说明 -- TODO(鸽子栏)

        对于这个简单的例子,sealed trait不是必需的;可以只使用一个样例对象作为消息传递。但在实际应用中一般会遵循本例子中的模式,所以我在例子中还是用上了sealed trait。

该对象还定义了一个apply方法,其它地方用这个方法来构造actor。正如我在5.4小节中提到的,“为类定义辅助构造函数”,对象中的apply方法的工作方式类似于工厂方法,允许构造实例类。

注意,apply方法使用Akka Behaviors.setup方法创建了一个新的Tom类的实例:

    object Tom {
        // “messages” that Tom can handle
        sealed trait Message
        case object Hello extends Message
        
        // the factory/constructor method
        def apply(): Behavior[Message] =
        Behaviors.setup(context => new Tom(context))
    }

根据Scaladoc,Behaviors.setup “是行为工厂。 行为实例的创建被推迟到actor启动……工厂函数通过传递ActorContext 作为参数,可用于生成子actor。 setup是通常在生成actor时用作最外层的行为。”

创建完该对象后,这样定义 Tom 类:

    import Tom.{Message, Hello}
    
    private class Tom(context: ActorContext[Message]) extends AbstractBehavior[Message](context) {
        override def onMessage(message: Message): Behavior[Message] = {
            message match {
                case Hello => println("Hi, I’m Tom.")
                this // return the current behavior
                // Behaviors.same
            }
        }
    }

关于这个类的注意事项:

  • 首先从Tom对象导入消息使得这个类的其余部分易读。
  • 将类的构造函数设为私有,因此只有伴生对象可以访问它。
  • 构造函数接收一个ActorContext实例,类型为Message且伴生对象中定义。
  • 该类继承了Akka AbstractBehavior类,并实现了抽象方法onMessage
  • onMessage接收Message实例。
  • onMessage的返回类型为Behavior[Message],所以返回了Behavior.same。虽然在这个例子中使用有些过度了,但是Behavior Scala文档表明“建议系统复用以前的行为”。当处理拥有多种行为的复杂actor时会更有意义,并且可以根据所需使用返回值来更改行为。 18.8小节有关于更改状态的详细信息。

在这个例子中,使用match表达式来处理Message。虽然在这个例子中并不是必需的,但它是处理消息的常用方式,所以我展示它。

现在我们拥有了Tom类和伴生对象,这个App例子展示了是如何工作的:

    import akka.actor.typed.Behavior
    import akka.actor.typed.ActorSystem
    import akka.actor.typed.scaladsl.ActorContext
    import akka.actor.typed.scaladsl.Behaviors
    import akka.actor.typed.scaladsl.AbstractBehavior
    
    object HiImTomOop extends App {
        val actorSystem: ActorSystem[Tom.Message] = ActorSystem.create(Tom(), 						"50FirstDatesSystem")
        
        actorSystem ! Tom.Hello
        actorSystem ! Tom.Hello
        actorSystem ! Tom.Hello
        
        Thread.sleep(500)
        actorSystem.terminate()
	}

关于这段代码的几点说明:

  • HiImTomOop是一个普通的Scala 2应用程序,使用App对象创建。
  • ActorSystem是用Tom.Message类型创建的。我可以和之前一样导入Tom.MessageTom.Hello,但是我想展示没有这些导入,代码看起来是什么样的。
  • 代码Tom()调用Tom伴生对象中的apply方法,构造Tom类的实例。
  • 50FirstDatesSystem”是给ActorSystem实例起的名字。这个名字可以随意,但是如果看过电影50 First Dates,就会明白为什么为actor选择了Tom这个名字。

App结束时,将这行代码调用了3次:

    actorSystem ! Tom.Hello

这会在 sbt 提示符处产生以下输出:

    Hi, I’m Tom.
    Hi, I’m Tom.
    Hi, I’m Tom.

同时创建Actor和ActorSystem -- TODO 鸽子栏

        在这段代码中,同时创建了Tom actor和actor system:

    val actorSystem: ActorSystem[Tom.Message] =
    ActorSystem.create(Tom(), "50FirstDatesSystem")

这有些特别,但这里发生的情况是因为Actor System继承了ActorRefactorSystem变量既是Actor System又是ActorRef。 也就是说,actorSystem本质上是Tom的一个实例,将变量命名为tom而不是actorSystem也同样有效。

在这小节中采用了这种方法来尽可能少得编写代码,在后面小节中,当把ActorSystem创建为SupervisorGuardian时,会展示更常用的技术。

讨论

在编写本书时,Akka还没有移植到Scala 3,所以这章中的例子使用的是Scala 2的最新版本。

Behaviors

如果熟悉Akka Classic,可能会注意到这个例子中缺少Actor类。在Akka Typed中,actor的概念被behaviors所取代。一开始我觉得很困惑,直到我在Learn Scala Programming Slava Schmidt(Packt)一书中看到这句话:

    “任何定义良好的行为都是一个可计算的实体,因此也可以是一个actor。”

因此,需要了解的主要概念是,在Akka Typed中,将关注behaviors而不是actor。

重复的模式

关于Akka Typed中面向对象风格的行为需要知道的第二件事是,创建它们遵循一致的模式。面向对象风格的行为有如下模板:

    object OopActor {
        // “messages” that OopActor can handle
        sealed trait Message
        final case object Message1 extends Message
        final case class Message2(param: SomeType) extends Message
        
        // the factory/constructor method
        def apply(): Behavior[Message] =
            Behaviors.setup(context => new OopActor(context))
    }

    private class OopActor(context: ActorContext[OopActor.Message]) extends AbstractBehavior[OopActor.Message](context) {
        override def onMessage(msg: OopActor.Message): Behavior[OopActor.Message] =
            msg match
                case Message1 =>
                    // handle this message here
                    Behaviors.same
                case Message2(param) =>
                    // handle this message here
                    Behaviors.same
}

Behaviors.same

因为onMessage是一个具有Behavior返回类型的覆盖方法,所以对于这样的简单例子,match表达式的每个case语句都返回Behaviors.same。这表明下一个行为与当前行为相同。因为actor可能有很多行为,你会在18.8小节中看到替代方案。此外,在面向对象风格中,可以返回this而不是Behaviors.same。因此可以看到两种风格。

18.6 创建函数式风格的Actor

问题

从本章的介绍中,你已经知道可以创建OOP或FP风格的Akka Typed actor,现在你想学习并理解FP风格。

解决方案

函数式风格的解决方案比前面小节面向对象风格展示的解决方案更简短,只需要使用Behavior.setup实现的apply方法创建对象,并且在面向对象风格中使用相同的消息:

    object Tom {
        // the “messages” this actor can respond to
        sealed trait Message
        final case object Hello extends Message
        
        // the factory method
        def apply(): Behavior[Message] = Behaviors.setup {
            context: ActorContext[Message] =>
                Behaviors.receiveMessage { message: Message =>
                    message match {
                        case Hello =>
                            println("Hi, I’m Tom.")
                            Behaviors.same
                   }
              }
        }
    }

关于代码的注意事项:

  • 与前面的小节一样,这个对象定义了Tom可以处理的消息,在本例中是Hello消息。
  • apply的方法体是在Akka Behavior.setup方法的帮助下实现的。
  • 该方法将Message类型的ActorContext传递给Behaviors.receiveMessage辅助方法。
  • receiveMessage代码块中,接收的消息在match表达式中处理。
  • 与上一个小节中的面向对象风格一样,本例中的match表达式使用有点过度了,但是因为它是在真实世界中经常使用的,所以展示了它。
  • 正如在“同时创建ActorActorSystem”中解释的那样,ActorSystem是一个ActorRef,因此actorSystem本质上是Actor,将其命名为tom也同样有效。

sealed trait的说明 -- TODO(鸽子栏)

        对于这个简单的例子,sealed trait不是必需的;可以只使用一个样例对象作为消息传递。但在实际应用中一般会遵循本例子中的模式,所以我在例子中还是用上了sealed trait。

我将讨论Behaviors.setupBehaviors.receiveMessage,但现在,这个App可以测试Tom函数式风格的actor:

    import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
    import akka.actor.typed.scaladsl.{ActorContext, Behaviors}

    object HiImTomFp extends App {
        import Tom.{Message, Hello}
        val actorSystem: ActorSystem[Message] = ActorSystem(
            Tom(),
            "50FirstDatesSystem"
        )
        actorSystem ! Hello
        actorSystem ! Hello
        actorSystem ! Hello
        
        Thread.sleep(500)
        actorSystem.terminate()
    }

在这个代码中:

  • Tom对象导入MessageHello消息,使其余代码更易于阅读。
  • 创建了一个带有Message类型的ActorSystem(即Tom.Message)。
  • Tom() 引用调用Tom对象中的apply方法来创建Tom的新实例。
  • 50FirstDatesSystem”可以是任何合法的字符串,但由于这个例子是根据电影50 First Dates中的 Tom actor建模的,所以使用了这个名字。
  • ! 语法向 Tom actor 发送了三个 Hello 消息。

最后三行代码在sbt提示符处产生下面输出:

    Hi, I’m Tom.
    Hi, I’m Tom.
    Hi, I’m Tom.

讨论

正如在上一个小节中所讨论的,有一个重要概念是Akka Typed不使用“actor”这个词,而是使用BehaviorBehaviors。因此,需要再次了解一个重要概念:在Akka Typed中,将专注于behaviors而不是actors。

Behaviors Scala文档中声明setup是行为的工厂,即行为的定义。同时用于函数式和面向对象风格。

Behaviors.receiveMessage是定义actor/behavior主体的几种方法之一。receiveMessage用在已经有ActorContext的地方 —— 可以通过Behaviors.setup接收它 —— 并且只希望通过访问actor接收到的消息来实现actor的主体。

在此使用到的常用方法如表18-1所示。

表18-1 用于创建Akka Typed Actors的行为方法

方法 描述
Behaviors.receiveMessage 在本节中,通常使用match表达式接收并处理消息。
Behaviors.receiveMessagePartial receiveMessage类似,但使用PartialFunction实现的方法体,通常只实现actor可以处理消息的子集。
Behaviors.receive 接收ActorContext和消息。
Behaviors.receivePartial receive类似,但使用PartialFunction实现方法体,通常只实现actor可以处理消息的子集。

关于可使用的不同方法的详细描述,参考Behaviors对象的Scaladoc( https://oreil.ly/7H2XG )。

Akka Typed的一个优点是:允许开发人员以面向对象或函数式风格创建actor(behaviors),并且可以在同一个应用中使用这两种风格。可以选择你喜欢的风格。

18.7 向Actor发消息

问题

你想了解如何向Akka Typed actor发送消息。

解决方案

向Akka Typed actor发送消息的解决方案与Akka Classic actor的解决方案相同。首先,需要获取其ActorRef的引用。一旦获取到,使用 ! 方法发送异步消息。

    anActorRef ! "Hello"

你也可以使用 ask 方法向它发送消息获得一个 Future. 但要尽量少使用,因为 Future 的使用会让你想使用阻塞,直到得到回复,而阻塞在异步代码中是很不好的。

import语句

下面的代码需要这些import语句:

    import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
    import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}

消息建模

下面的例子展示了如何向Akka Typed actor发送消息,以及如何在actor中接收消息。 不仅使用了一个智能恒温器的例子,还使用了18.6小节中的函数式风格的actor。

在设计Akka actor时通常会有两点思考:

  • actor是做什么的?
  • actor应该接收什么消息?

对于智能恒温器,它所做的三件事是:查询当前温度,升高温度和降低温度。分别对应于它能够接收和处理的消息:

  • 当前温度设置是多少?(CurrentTemperature
  • 温度升高X度。(IncreaseTemperature
  • 温度降低X度。(DecreaseTemperature

假设智能恒温器从 ActorRef[SystemMessage] 类型的actor那里接收到这些消息 —— 稍后会详细介绍 —— 使用Akka Typed,可以像这样建模消息:

    object ThermostatActor {
        import ThermostatSupervisor.{SystemMessage, StringMessage}
        
        // our API, i.e., the messages we can respond to
        sealed trait MessageToThermostat {
            def sender: ActorRef[SystemMessage]
        }
        final case class CurrentTemperature(sender: ActorRef[SystemMessage])
        extends MessageToThermostat

        final case class IncreaseTemperature(
            sender: ActorRef[SystemMessage],
            amount: Int
        ) extends MessageToThermostat

        final case class DecreaseTemperature(
            sender: ActorRef[SystemMessage],
            amount: Int
        ) extends MessageToThermostat

        // more code here ...

如果你从Akka Classic来到Akka Typed,这段代码最特别的地方在于,ThermostatActor可以接收的消息还包括关于sender的信息:

    sender: ActorRef[SystemMessage]

对于Akka Typed的消息,通常包含给你发送消息的ActorRef的引用。在这种情况下,我认为自己是ThermostatActor,和我联系的actor应该具有类型ActorRef[SystemMessage]

开发人员通常会为这个字段命名,例如replyTosenderSystemMessage类型指的是当把消息发送回发送方时,发送方能够接收的消息类型,稍后将在Supervisor代码部分中看到。

ThermostatActor代码的其余部分与18.6小节中展示的类似:使用Behaviors.setupBehaviors.receiveMessagematch表达式实现apply方法:

    object ThermostatActor {
        // our API, i.e., the messages we can respond to are
        // enumerated here ...
        
        var currentTemp = 72

        // we respond to `MessageToThermostat` queries
        def apply(): Behavior[MessageToThermostat] = Behaviors.setup {
            context: ActorContext[MessageToThermostat] =>
            Behaviors.receiveMessage { message =>
                message match {
                    case CurrentTemperature(sender) =>
  						sendReply(sender)
                    	Behaviors.same
                    case IncreaseTemperature(sender, amount) =>
                    	currentTemp += amount
                    	sendReply(sender)
                    	Behaviors.same
                    case DecreaseTemperature(sender, amount) =>
                        currentTemp -= amount
                        sendReply(sender)
                        Behaviors.same
                }
            }
        } // Behaviors.setup/apply

      private def sendReply(sender: ActorRef[SystemMessage]) = {
          val msg = s"Thermostat: Temperature is $currentTemp degrees"
          println(msg)
          sender ! StringMessage(msg)
      }
    }

这是在18.6小节中解释过的标准Scala和Akka Typed代码,所以不会在这里详细介绍。重要的部分是match表达式,使得ThermostatActor能够响应所有的消息。

除非可以连接到真实的恒温器,否则这就是实现ThermostatActor需要做的所有事情。 现在只需要编写一些代码来测试它。

创建监督者

接下来,将创建一个supervisor actor。这个actor会创建ThermostatActor,并向它发送消息和从它那接收消息。源代码中添加了注释,用来解释它是如何工作的:

    object ThermostatSupervisor {
        // these are the messages we can receive. some will be sent to us from the
        // App, which you’ll see shortly. others are sent to us by the
        // ThermostatActor.
        sealed trait SystemMessage

        case object StartSendingMessages extends SystemMessage

        case object StopSendingMessages extends SystemMessage

        case class StringMessage(msg: String) extends SystemMessage

          // this is the usual `apply` template.
          def apply(): Behavior[SystemMessage] = Behaviors.setup[SystemMessage] {
              actorContext =>
              
                  // when we’re created, the first thing we do is create a
                  // ThermostatActor. technically, it is a “child” to us.
                  val thermostat = actorContext.spawn(
                    ThermostatActor(),
                    "ThermostatActor"
                  )
                  
              // this is where we set up the handling of messages that can be
              // sent to us.
              Behaviors.receiveMessage {
                  // when we receive the message StartSendingMessages,
                  // send three messages to the ThermostatActor.
                  case StartSendingMessages =>
                      thermostat ! CurrentTemperature(actorContext.self)
                      thermostat ! IncreaseTemperature(actorContext.self, 1)
                      thermostat ! DecreaseTemperature(actorContext.self, 2)
                      Behaviors.same
                  case StopSendingMessages =>
                  	  Behaviors.stopped
                  case StringMessage(msg) =>
                      println(s"MSG: $msg")
                      Behaviors.same
              }
          }
    }

一个测试的应用程序

有了这两个部分,现在只需要一个App来测试我们的系统:

    object ThermostatApp extends App {
        import ThermostatSupervisor.{
            SystemMessage, StartSendingMessages, StopSendingMessages
        }
        val actorSystem: ActorSystem[SystemMessage] = ActorSystem(
            ThermostatSupervisor(),
            "ThermostatSupervisor"
        )
        actorSystem ! StartSendingMessages
        Thread.sleep(1_000)
        actorSystem ! StopSendingMessages
        
        Thread.sleep(500)
        actorSystem.terminate()
    }

App做了以下事情:

  • ThermostatSupervisor导入消息,使得其余代码更易于阅读。
  • 通过创建ThermostatSupervisor的实例来创建ActorSystem,并给系统命名。
  • 发送StartSendingMessages消息给ThermostatSupervisor
  • 休眠100毫秒,然后向ThermostatSupervisor发送StopSendingMessages消息。

ThermostatSupervisor收到StartSendingMessages消息时,会向ThermostatActor发送三个消息来响应:

    case StartSendingMessages =>
        thermostat ! CurrentTemperature(actorContext.self)
        thermostat ! IncreaseTemperature(actorContext.self, 1)
        thermostat ! DecreaseTemperature(actorContext.self, 2)
        Behaviors.same

App的输出看起来是这样:

    Thermostat: Temperature is 72 degrees
    Thermostat: Temperature is 73 degrees
    Thermostat: Temperature is 71 degrees
    MSG: Thermostat: Temperature is 72 degrees
    MSG: Thermostat: Temperature is 73 degrees
    MSG: Thermostat: Temperature is 71 degrees

尽管无法保证消息在并发系统中的传递顺序,但图18-1可以了解初始的消息在App中是如何发送和回复的。

图18-1:简化版的UML时序图展示了actor间的初始消息

讨论

在这个App中,注意到ThermostatSupervisor使用actorContext.selfThermostatActor消息将自己的ActorRef发送到 ThermostatActor

    thermostat ! CurrentTemperature(actorContext.self)

然后,这些消息在match表达式中接收,ThermostatActor作为sender变量,如下所示:

    case CurrentTemperature(sender) =>
        sendReply(sender)

发送 ActorRefs

ActorRef作为消息的一部分发送是Akka Typed的常见模式。如我的博客“Akka Typed: Finding Actors with the Receptionist( https://oreil.ly/Fvrae )”中所说,你还可以查找actor引用,但如果与你联系的actor可以提供回复方式,这将变得更加容易。这就像给某人发送一封信时将回信地址放在信封上,或者向某人发送电子邮件时将你的电子邮件地址放在信封上。

消息作为Actor的API

将这些消息视为actor的API很有用。你可以通过它们来声明一个actor可以接收的消息类型。在面向对象的世界中,相当于在类中声明以下方法:

    def currentTemperature(): Int = ???
    def increaseTemperature(amount: Int) = ???
    def decreaseTemperature(amount: Int) = ???

或者,更准确地说:

    def currentTemperature(sender: ActorRef[SystemMessage]): Int = ???
    def increaseTemperature(sender: ActorRef[SystemMessage], amount: Int) = ???
    def decreaseTemperature(sender: ActorRef[SystemMessage], amount: Int) = ???

另见

有时无法将ActorRef传递给另一个actor。这种情况下,需要通过Akka Receptionist查找其它actor。我写了篇关于这个的长篇博客:“Akka Typed: Finding Actors with the Receptionist”( https://oreil.ly/Fvrae )。

18.8 创建多种状态的Actor(FSM)

问题

你想要创建一个随着时间的推移可以具有不同行为(或状态)的actor,也就是使用Akka Typed实现一个有限状态机(FSM)。

解决方案

在Akka Classic中,使用become方法来实现一个拥有多个状态的actor,但是在Akka Typed中,方法是不同的。 解决方案是:

  • 为actor所在的每个状态定义一个函数。
  • 定义一组唯一消息, 由每个状态处理。

想象为Clark KentSuperman建模,他们是同一个actor,但是他们在不同的时间处于两种不同的状态,并且每种状态处理不同的消息。使用Akka Typed,可以像这样对消息进行建模:

    sealed trait BaseBehaviors

    // clark kent actions
    sealed trait ClarkKentBehaviors extends BaseBehaviors
    final case object WorkAtNewspaper extends ClarkKentBehaviors
    final case object PutOnGlasses extends ClarkKentBehaviors
    final case object BecomeSuperman extends ClarkKentBehaviors

    // superman actions
    sealed trait SupermanBehaviors extends BaseBehaviors
    final case object Fly extends SupermanBehaviors
    final case object SavePeople extends SupermanBehaviors
    final case object BecomeClarkKent extends SupermanBehaviors

注意这些行为是不同的:Clark Kent在报社工作,当戴上眼镜时没人会认出他,并且也可以成为超人。但他不会FlySavePeople或成为BecomeClarkKent,每个状态都有自己的行为。

给定这些消息和这些import语句:

    import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
    import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}

解决方案的其余部分是定义一个拥有两个函数的actor,每个状态一个:

  • clarkKentState
  • supermanState

下面是源码:

    object ClarkKent {
    
      	// initial state
    	def apply(): Behavior[BaseBehaviors] = clarkKentState()

        private def clarkKentState(): Behavior[BaseBehaviors] =
        Behaviors.receiveMessagePartial[BaseBehaviors] { message: BaseBehaviors =>
            message match {
                case WorkAtNewspaper =>
        	    println("normalState: WorkAtNewspaper")
            	    Behaviors.same
                case PutOnGlasses =>
                    println("normalState: PutOnGlasses")
                    Behaviors.same
                case BecomeSuperman =>
                    println("normalState: BecomeSuperman")
                    supermanState()
            }
        }

        /**
         * `Behaviors.receiveMessagePartial`: Construct an actor `Behavior` from a
         * partial message handler which treats undefined messages as unhandled.
         */
        private def supermanState(): Behavior[BaseBehaviors] =
        Behaviors.receiveMessagePartial[BaseBehaviors] { message: BaseBehaviors =>
            message match {
                case Fly =>
                    println("angryState: Fly")
                    // supermanState()
                    Behaviors.same
                case SavePeople =>
                    println("angryState: SavePeople")
                    // supermanState()
                    Behaviors.same
                case BecomeClarkKent =>
                    println("normalState: BecomeClarkKent")
                    clarkKentState()
              }
          }
      }

这个代码的模式应该在前面的小节中很熟悉。 第一个变化是,没有将所有行为都放在apply方法中,而是简单地声明了初始行为:

    def apply(): Behavior[BaseBehaviors] = clarkKentState()

然后,当clarkKentState收到一条BecomeSuperman的消息时,会切换状态到supermanState

    case BecomeSuperman =>
        println("normalState: BecomeSuperman")
        supermanState()

一段时间后,supermanState可能会收到条BecomeClarkKent消息,并通过切换到clarkKentState来响应:

    case BecomeClarkKent =>
        println("normalState: BecomeClarkKent")
        clarkKentState()

第二个变化是使用receiveMessagePartial方法而不是receiveMessage来实现每个功能。 会在讨论部分详细说明。

一个测试的应用程序

到这里,还需要一个测试App来展示系统。 如果读过之前的小节,这个模式看起来很熟悉:

    object ClarkKentApp extends App {
    
        val actorSystem: ActorSystem[BaseBehaviors] = ActorSystem(
            ClarkKent(),
            "SupermanSystem"
        )
        actorSystem ! WorkAtNewspaper

        // these will fail because the system is in the wrong state
        actorSystem ! Fly
        actorSystem ! SavePeople
        actorSystem ! BecomeClarkKent

        // this will work
        actorSystem ! WorkAtNewspaper

        // now these will work
        actorSystem ! BecomeSuperman
        actorSystem ! Fly
        actorSystem ! SavePeople
        actorSystem ! BecomeClarkKent
        Thread.sleep(500)
        actorSystem.terminate()
    }

在代码中添加了一些注释,用来解释发送消息给拥有多个状态的actor时会发生什么。用这种方式编写代码为了展示:向actor发送SupermanBehaviors消息时,clarkKentState响应消息,会产生错误。向actor发送clarkkentbehavior消息,supermanState处理消息时,也会产生错误。

这个App的结果输出如下:

    normalState: WorkAtNewspaper
    normalState: WorkAtNewspaper
    normalState: BecomeSuperman
    angryState: Fly
    angryState: SavePeople
    normalState: BecomeClarkKent
    
    [date time] [INFO] [akka.actor.LocalActorRef]
    [SupermanSystem-akka.actor.default-dispatcher-3] [akka://SupermanSystem/user]
    - Message [clark_kent.Fly$] to Actor[akka://SupermanSystem/user] was
    unhandled. [1] dead letters encountered. This logging can be turned off or
    adjusted with configuration settings 'akka.log-dead-letters' and
    'akka.log-dead-letters-during-shutdown'.
    
    more “dead letter” logging here ...

讨论

正如18.6小节中提到的,通过Behaviors.receiveMessagePartial,可以使用PartialFunction实现方法体,方法体可以实现actor能够处理的消息子集。未在match表达式中处理的消息视为unhandled消息,如果它们发送给actor,最终会进入死信系统。

在这段代码中,采用了一种方法,将BaseBehaviors定义为actor可以处理的所有行为的根。然后在clarkKentState方法中处理ClarkKentBehaviors,在supermanState方法中处理SupermanBehaviors。由于使用了 sealed trait 和 final case object,Scala编译器能够确定每个方法中处理所有消息的子集,并将match表达式当作偏函数。

死信

在实际应用的邮政系统中,死信是由于各种原因无法投递的信件,例如地址错误,因此信被发送到某个处理中心。(我不知道那里的信发生了什么)

类似地,如Akka指南( https://oreil.ly/WZCGu )中所说:“无法投递(并且可以确定)的消息将投递给名字为 /deadLetters 的合成actor。这种投递是在尽最大努力的基础上发生的,即使在本地JVM中也可能会失败……该工具的主要用途是调试。”

在我的例子中,没有尝试监视这些死信,只是让日志记录工具将它们打印到控制台。如果要监视本地系统的死信,可以创建一个订阅akka.actor.DeadLetter类的actor。这个过程在Akka event bus ( https://oreil.ly/Yhkbj ) 页上进行了描述,订阅的过程类似于查找actor并监听它们,在我的博客文章“Akka Typed: Finding Actors with the Receptionist( https://oreil.ly/Fvrae )”中展示了这个例子。

请注意,根据文档,“死信不会通过网络传播”,因此如果拥有一个分布式系统并希望将死信全部收集到一个地方,“必须为每个网络节点订阅一个actor并手动转发它们。”