Skip to content
crazyjohn edited this page Jun 2, 2015 · 1 revision
  1. 开场白

首先为了让你有足够的阅读欲望,我负责任的告诉你akka是这个宇宙最NB的框架。无论是从api接口设计,框架整体设计,actor模型设计,remoting远程支持,let it crash容错支持,cluster集群支持,eventbus事件驱动,都是设计的教科书般无可挑剔。

  1. Actor类型

Actor分为UntypedActor和TypedActor。其中UntypedActor为通用的Actor模型实现,要实现一个Actor只需继承UntypedActor类即可,然后在实现的onReceive(msg:Object)的方法中去写各种消息的处理逻辑。TyepedActor作为中间层用来连接已有的业务服务层和Actor模型,这样已有的业务服务层几乎不用怎么修改,内部其实是一个实现了Active Object活动对象模式,具体实现用到了动态代理的技术。

  1. 创建Actor

从整体层次体系来说,最顶层的Actor称为RootActor,底下有两个子Actor,UserActor和SystemActor,UserActor底下的这个体系就是用户自定义的所有Actor。用户自定义的Actor分为两个级别,第一个是TopLevel,第二个是Child。TopLevel级别的Actor使用ActorSystem.actorOf方法来创建。child级别的Actor使用getContext().actorOf来创建。上面说到的actorOf方法创建出来的对象是一个ActorRef,这其实类似一个代理接口,除了暴露的特定的通信接口以外,不暴露任何可供外界操作的接口。

一个ActorSystem创建以后,至少会启动3个actor: root guardian; user guardian; system guardian。

  1. /user: The Guardian Actor。这个Actor是所有用户创建的actor的父actor,这个guardian监护人命名为"/user"。所有通过system.actorOf创建的actor都是它的孩子,这也就意味着当这个监护人终止,actorSystem里的所有普通actor都会终止,同时也意味着这个guardian的supervisor strategy监护策略将决定所有top-level的普通的actor是如何被监护的。从akka2.1开始,这个策略都可以通过设置akka.actor.guardian-supervisor-strategy去配置。当这个guardian向上传递(escalate)一个错误,这时root guardian的响应将会是终止这个guardian,那也意味着将会终止整个ActorSystem。
  2. /system: The System Guardian。这个特殊的guardian被引入用来在普通的actor终止的时候,去执行一个有序的终止序列,用日志记录剩余的活动记录。而且它的日志记录同样也使用actor实现。它的实现方式是用system guardian来watch监控user guardian,然后根据收到的Terminated消息来开始它的终止流程。top-level级别的system actor的监控策略如下:ActorInitializationException, ActorKilledException将会终止子Actor,;其余的Exception将会重启子actor;其它的Throwable将会escalate向上传递,这将会终止整个Actor system。
  3. /: The Root Guardian。这货是万王之王,所有top-level级别actor的伟大的爹。待续。。。

Top Level Scopes for Actor Paths。

  • /user 是所有用户创建的top-level的actor的guardian actor。
  • /system 是所有system-created系统创建的top-level的actor的guardian actor。
  • /deadLetters 是dead letter actor。所有发送给已经终止的actor或者不存在的actor的消息都会被路由给它处理。
  • /temp 是所有system-created的short-lived的actor的guardian。
  • /remote 。
  1. Actor通信方式

首先Actor之间的通信方式是通过中间介质:不可变消息。发送消息有两种方式,其一是fire and forget也就是只管发送没有返回值,实现方法是ActorRef的tell方法;其二是ask and return也就是发送并且等待返回值,实现方法是ActorRef的ask方法,这个方法会返回一个Future欠条对象,然后可以调用Await的result(future, timeout)方法去做超时等待返回结果。这其中tell是建议使用的通信方式,这样更符合高并发的异步模型,容易获得更高的吞吐量和更低的延时。

  1. 停止Actor

如何正确的停止Actor呢。这里采用的思想也类似并发设计模式two phase termination,也就是分为两步,先是Stop状态,这时候已经不处理新来的消息了,但是要处理完之前位处理完的消息,等所有消息处理完了就进入了Terminate状态然后做后续的终止处理。 当Actor收到了Stop信号,就会进行下面的步奏:

  1. Actor停止处理邮箱的消息。
  2. Actor发送Stop信号给它所有的child actor。
  3. Actor等待来自所有child actor的termination消息。
  4. Actor开始自己本身的终止步骤:调用postStop方法;Dumping关联的邮箱;发布terminated消息给DeathWatch;通知自己的Supervisor自身的终止情况。

具体调用的api接口如下。当调用ActorSystem的shutdown方法时候,它会连带关闭所有的这个系统关联的actor;可以通过给指定的actor发送PoisonPill消息来终止它;可以通过调用context.stop方法来终止指定的actor。

  1. fault tolarence容错处理

akka引用了erlang的容错理念,也就是let it crash。引入了Supervisor监控者,抽象出了Supervisor Strategy监控策略,主要分为两种:One For One和All For One。第一种策略就是说当有一个actor发生错误或者异常的时候,监控者会对出错的这个actor进行单独的处理。第二种策略是说当有一个actor发生错误或者异常时候,监控者会对下面监控的所有actor执行指定的处理。具体的处理方式有如下:

  1. Resume继续,这种处理会保持原有的内部状态。

  2. Restart重启,这种处理会清理内部状态。

  3. Stop终止。

  4. Escalate向上传递。

默认的监控策略。Escalate向上传递是默认的处理策略当收到没有覆盖到的异常的时候。当Actor没有定义监控策略的时候,下面的是默认的异常处理:

  1. ActorInitializationException会停止错误的子Actor。

  2. ActorKilledException会停止失败的子Actor。

  3. Exception会从其失败的子Actor。

  4. 其它类型的Throwable将会向上传递给父级的Actor。

  5. Monitoring监控


Monitoring监控和上头说道的Supervisor密切相关。Lifecycle Monitoring在akka中也叫做DeathWatch。实现方式是通过monitoring actor接受到Terminated消息,默认的处理行为是抛出一个特殊的DeathPactException如果没有其它处理逻辑。如果要开始监听Terminated消息,需要调用context.watch(targetActorRef)。如果要停止监听Terminated消息,可以调用context.unwatch(targetActorRef)。

Monitoring非常有用在supervisor无法简单restart重新启动子actor然后需要终止子actor的时候。例如子actor在初始化的时候发生了错误,这种情况下需要monitor监控这些子actor,然后重新创建它们或者尝试在一段时间内重新尝试。(尼玛我自己都没看懂)

总结一下就是Supervisor用来监控actor在运行时候发生异常的情况,然后对异常做出处理,monitoring用来对actor进行DeathWatch,通过context.watch和unwatch,这样可以接收到子actor的Ternimated消息然后进行处理。工程环境中两者结合使用。

  1. DeadLetters

DeadLetter这个词用的真好,这也是我为什么爱akka,代码读起来太舒服了。当一个actor进入终止状态或者是已经挂掉了,外界这时候如果还继续给此actor发消息,那么这些消息就会变成DeadLetter,然后进入了DeadLetter mailbox,等待处理。ActorSystem默认的处理方式是打印出这条消息,不过这个日志可以在配置文件中进行配置。同时也可以通过system.eventStream(subscriber,classfier)的方式来监听DeadLetter的方式来进行指定的处理。

  1. Dipatcher分发器

首先从底层到高层是这样的一个层次:CPU - threads - dispatcher - actor and mailbox。分发器的作用就是把消息分发到指定的线程进行消息的处理。

Dispatcher封装了底层的线程处理逻辑。akka提供了以下几种分发器类型:

Dispatcher。这是默认的分发器实现,基于事件驱动的模型,把一个actor set绑定到指定的线程池,每个actor都有自己的mailbox;分发器可以被任意的actor共享;分发器底层可以使用ThreadPool或者ForkJoinPool来驱动;这种分发器在用于非阻塞场景和代码的时候效率是最佳的。

Pinned dispatcher。每个actor都附加一个单一线程,这种分发器在处理io操作或者长时间运算的时候非常有用,分发器会在actor停止活动一段时间后(时间可配置)解除线程和actor的绑定附加关系。所有的actor都有自己的邮箱;actor不和其它actor共享分发器因为每个actor都有自己专用的线程;这种分发器在阻塞环境下会有最佳效果,例如代码在执行io操作或者数据库操作,这时候actor会等待工作结束,对于这样的阻塞操作pinned dispatcher要比default dispatcher表现的更好。

Balancing dispatcher。这种分发器会尝试把工作从busy忙的actor分布到idle空闲的actor,而且这种分配工作只有在所有的actor是同种类型的时候才可以进行。所有的actor共享一个mailbox;分发器只有在actor是同种类型的时候才可以被共享;这种分发器底层可以被threadpool或者fork join pool驱动。

Calling thread dispatcher

  1. mailboxes邮箱

邮箱使用java并发包中的queue队列作为驱动,队列分为以下两种:

  1. BlockingQueue。阻塞队列意味着在put元素的时候需要等待队列有空间也就是非满,在take元素的时候需要等待队列非空。

  2. Bounded queue。有界队列意味着队列的大小是有上限的,意味着你不用put超过size的元素到队列。

Akka中的邮箱类型。

  1. Unbounded mailbox。使用ConcurrentLinkedQueue实现。无阻塞,无界限。

  2. Bounded mailbox。使用LinkedBlockingQueue实现。阻塞,有界限。

  3. Unbounded priority mailbox。使用PriorityBlockingQueue实现,阻塞,无界限。

  4. Bounded priority mailbox。使用被akka的BoundedBlockingQueue包装好的PriorityBlockingQueue实现,阻塞,有界限。

  5. Router路由器


Router路由器的作用就是负责把到来的消息定向路由给指定的目标actor。Router大概分为以下几种类型:

Round robin router。这种路由器会把消息会把消息按照环装路由给指定的routees。

Random router。随机选取一个routee把消息路由给它。

Smallest mailbox router。选取邮箱中消息最少的routee。

Broadcast router。把消息广播给所有的routees。

Scatter gather first completed router。把消息广播给所有的routees,并且等待第一个完成的的响应,并且把响应发送给调用者。

  1. EventBus事件驱动

事件总线设计用来给一组Actor对象发送消息。首先看看EventBus的接口: subscribe(subscriber:Subscriber, to: Classifier)用来注册指定的订阅者订阅指定的分类器。unscribe(subscriber:Subscriber, from:Classifier)。publish(event:Event)发布指定的事件到事件总线。一个事件总线的实现需要定义以下几种类型参数:

Event(E)。 在事件总线中发布的事件类型。

Subscriber(S)。在事件总线中注册的订阅者。

Classifier(C)。分类器用来选择订阅者来分发事件。

EventStream是每个ActorSystem的事件总线的实现,被用来处理日志和DeadLetter。

system.eventStream().subscribe(actor, AllDeadLetters.class)

  1. 网络IO以及序列化

Serialization序列化。说到网络,首先要说它的基础设施也就是序列化,序列化的作用是把一个需要序列化的对象Serializable通过序列化器Serializer转化成byte[]字节数组,然后通过网络从一端传输到另一端。与之相对的还有反序列化deserialize,也就是把byte[]字节数组通过deserializer反序列化器转换成对应的对象。

akka自身直接支持基于java自己的序列化体系Serializable + ObjectStream的readObject和writeObject。以及基于google protobuf实体Message + Builder的序列化体系。你也可以根据需要编写自己的序列化器,只需继承自JSerializer就可以,这个类留有4个接口要你自己去实现:

  1. identifier():int。用来唯一标识你自己的Serializer,你可以选择除了0-16以外的任何数字, 0-16被akka自己占用了。

  2. includeManifest():boolean。这里用来表示fromBinary是否需要一个class标识。

  3. toBinary(object:Object):byte[]。序列化。

  4. fromBinaryJava(bytes:byte[], clazz:Class):Object。反序列化。

网络IO。akka的IO层设计是完全actor化的,也就是说所有的操作都通过消息传递来代替直接的方法调用。所有的IO驱动例如TCP和UDP都有自己特有的actor,这种actor被称作manager,要访问特有的manager可以通过query一个ActorSystem,例如TCP的可以通过如下方式: final ActorRef tcpManager = TCP.get(getContext().system()).manager();可以通过发送Connected消息去建立连接,所有的操作都可以通过发送消息给这个已经建立连接的Actor。

Write models(Ack, Nack)。IO设备都有自己最大的吞吐量,这个极限值又会限制写操作的频率和大小。当一个应用尝试推送超过设备可处理的数据的量的时候,设备会把数据缓存起来直到设备有能力去写它。通过缓存的方式,设备可以处理这些短暂的密集型写操作,但是缓存是有限的,所以要通过"Flow control"洪水控制用来避免发生无限的缓存扩张。akka支持两种方式的流量控制:

  1. Ack-based。也就是设备会通知writer写者当写成功的时候。

  2. Nack-based。 也就是设备会通知writer写者当写失败的时候。

ByteString。为了保持隔离性,actors需要只通过不可变对象通信。ByteString是一个不可以变的bytes字节容器。

  1. Remote远程Actor

akka的特性之一就是Location Transparency。它被设计成可以很好的工作于分布式环境,所有的actor交互都是纯消息交互而且所有的操作都是异步的。这样的设计是为了保证所有的功能都可以在单个jvm或者成百上千个集群机器上完整的运行。

akka的remote几乎没有什么api,它是纯配置化的。要开始使用remote你的第一步是把akka的actor provider从LocalActorRefProvider修改成RemoteActorRefProvider。下面的代码用来从远程节点获取一个Actor: ActorSelection selection = context.actorSelection("akka.tcp://actorSystemName@hostName:port/actorPath"),当你获取到这个selection你就可以用下面的方法使用它了,selection.tell来使用它了。

code:

public class RemoteClient {

	public static void main(String[] args) {
		Config config = ConfigFactory.load().getConfig("LOCAL");
		ActorSystem system = ActorSystem.create("ClientActor", config);
		String remotePath = "akka.tcp://RemoteActorSystem@127.0.0.1:2552/user/remoteMaster";
		ActorRef remoteProxy = system.actorOf(Props.create(RemoteActorProxy.class, remotePath), "remoteProxy");
		remoteProxy.tell("hi, remote actor!", ActorRef.noSender());
		// tell message
		remoteProxy.tell(Role.newBuilder().setName("crazyjohn").setRoleId(8888).build(), ActorRef.noSender());
		// shutdown
		// system.shutdown();
	}

}

Proxy:

/**
 * The remote actor proxy, it's means the local actor;
 * 
 * @author crazyjohn
 *
 */
public class RemoteActorProxy extends UntypedActor {
	private ActorSelection remote;

	public RemoteActorProxy(String remotePath) {
		remote = this.getContext().actorSelection(remotePath);
	}

	@Override
	public void onReceive(Object msg) throws Exception {
		// send to remote
		remote.forward(msg, this.getContext());
	}

}

Server:

public class RemoteServer {

	public static void main(String[] args) {
		new RemoteActorSystem();
	}

}

RemoteActor:

/**
 * The remote actor;
 * 
 * @author crazyjohn
 *
 */
public class RemoteActor extends UntypedActor {
	/** loggers */
	protected static Logger logger = LoggerFactory.getLogger(RemoteActor.class);

	@Override
	public void onReceive(Object msg) throws Exception {
		if (msg instanceof String) {
			logger.info(msg.toString());
		} else if (msg instanceof Message) {
			logger.info(JsonFormat.printToString((Message) msg));
		}
	}

}

RemoteActorSystem:

/**
 * The remote actor system;
 * 
 * @author crazyjohn
 *
 */
public class RemoteActorSystem implements IActorHostSystem {
	/** loggers */
	protected static Logger logger = LoggerFactory.getLogger(RemoteActorSystem.class);
	/** ActorSystem */
	private ActorSystem system;
	/** game master */
	private ActorRef remoteMaster;

	public RemoteActorSystem() {
		// load remote config
		Config config = ConfigFactory.load().getConfig("REMOTE");
		system = ActorSystem.create(this.getClass().getSimpleName(), config);
		remoteMaster = system.actorOf(Props.create(RemoteActor.class), "remoteMaster");
		logger.info(remoteMaster.toString());
	}

	@Override
	public void shutdown() {
		this.system.shutdown();
	}

	@Override
	public ActorSystem getSystem() {
		return system;
	}

	@Override
	public ActorRef getMasterActor() {
		return remoteMaster;
	}

}
  1. Cluster集群

Clone this wiki locally