We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
之前的「聊聊 IO 多路复用」中,我们理解了非阻塞 IO 的意义。但是 Spring MVC 并不能完美的应用非阻塞编程,于是 Spring 团队开发了 WebFlux,而 WebFlux 的基础正是本文要讲到的 Project Reactor(下文简称为 Reactor)
本文以 Reactor 为例带大家入门响应式编程
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.4.6</version> </dependency>
Reactor 是 JVM 的非阻塞响应式编程基础,支持背压。 它直接与 Java 8 函数式 API 集成,特别是 CompletableFuture、Stream 和 Duration。 它提供了可组合的异步序列 API — Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并实现了 Reactive Streams 规范。 在 Reactor 的基础上还演化出了适合微服务架构的 Reactor Netty 。为 HTTP(包括 Websockets)、TCP 和 UDP 提供支持背压和响应式的网络引擎。
上面是对于官方文档的翻译。下面来说说我自己对 Reactor 和响应式编程的理解。
回想一下之前的非阻塞 IO 编程,例如我们现在要用非阻塞的方式调用一个远程服务,当远程接口数据可用时去做一些业务处理。这时候代码怎么写呢?我们需要提供一个回调函数,然后在响应就绪的时候,去调用我们的回调函数。
从逻辑上来看,这完全没有问题。但是如果我们的回调很复杂,代码看起来会是什么样呢?
// 以下案例来自 Reactor 官网 userService.getFavorites(userId, new Callback<List<String>>() { public void onSuccess(List<String> list) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback<List<Favorite>>() { public void onSuccess(List<Favorite> list) { UiUtils.submitOnUiThread(() -> { list.stream() .limit(5) .forEach(uiList::show); }); } public void onError(Throwable error) { UiUtils.errorPopup(error); } }); } else { list.stream() .limit(5) .forEach(favId -> favoriteService.getDetails(favId, new Callback<Favorite>() { public void onSuccess(Favorite details) { UiUtils.submitOnUiThread(() -> uiList.show(details)); } public void onError(Throwable error) { UiUtils.errorPopup(error); } } )); } } public void onError(Throwable error) { UiUtils.errorPopup(error); } });
这个代码说实话已经有点回调地狱那味儿了,让一段不是很复杂的逻辑变得很难读了。但是如果用 Reactor 写呢?
// 以下案例来自 Reactor 官网 userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
可以看到,代码变得非常的简洁。唯一带来的困扰就是,我们不知道这些函数到底是啥意思 😂
响应式编程虽然有非常多的特性,但是它并不是什么神奇的技术,它也是建立在传统命令式编程的基础上。只不过它所提供的 API 以及规范更适合在非阻塞 IO 中使用。虽然在非阻塞 IO 框架中几乎只使用响应式编程(Vertx,WebFlux),只是因为这样做更合适,并不是说没了响应式编程,就玩不了非阻塞 IO 了。
Reactor 实现了 org.reactivestreams 提供的 Java 响应式编程规范,我们只要了解 reactivestreams 中代码是如何运转的,再看 Reactor 相关的代码就容易多了。
org.reactivestreams
下图展示了 reactivestreams 中的核心接口
Publisher:发布者
Subscriber:订阅者
Subscription:这个单词中文翻译为名词的订阅,在代码中它是发布者和订阅者之间的媒介
Processor:该接口继承了发布者和订阅者,可以理解为发布者和订阅者的中间操作(但是 Reactor 的中间操作并没有实现 Processor,在最新版本的 Reactor 中,Processor 的相关实现接口已经被弃用)
在了解了响应式编程的核心接口之后,我们来看下响应式编程是如何运作的
在 Reactor 中大部分实现都是按照上图的逻辑来执行的
调用 Subscriber#onNext,onComplete,onError 这三个方法,可能是在 Publisher 中做的,也可能是在 Subscription 中做的,根据不同的场景有不同的实现方式,并没有什么严格的要求。可以认为 Publisher 和 Subscription 共同配合完成了数据发布
其实 Reactor 中 API 实现原理也都是这个套路,我这边也自己写了个例子便于让读者加深对响应式编程的理解
import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; /** * @author tianwen.yin */ public class SimpleReactiveStream { /** * 实现一个简单的响应式编程发布者 * 逻辑:当订阅者发起订阅时,像下游发送一个 HelloWorld,发布逻辑由 SimpleSubscription 完成 */ static class SimplePublisher implements Publisher { @Override public void subscribe(Subscriber s) { // 2. Publisher 发布数据之前,调用 Subscriber 的 onSubscribe s.onSubscribe(new SimpleSubscription(data(), s)); } private String data() { return "Hello World"; } } static class SimpleSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { // 3. Subscriber 通过 Subscription#request 来请求数据 // 或者 Subscription#cancel 来取消数据发布 s.request(Long.MAX_VALUE); } @Override public void onNext(Object o) { System.out.println(o); } @Override public void onError(Throwable t) { System.out.println("error"); } @Override public void onComplete() { System.out.println("complete"); } } static class SimpleSubscription implements Subscription { String data; Subscriber actual; boolean isCanceled; public SimpleSubscription(String data, Subscriber actual) { this.data = data; this.actual = actual; } @Override public void request(long n) { if (!isCanceled) { try { // 4. Subscription 在接收到订阅者的调用后 // 通过 Subscriber#onNext 向下游订阅者传递数据 actual.onNext(data); // 5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流 actual.onComplete(); } catch (Exception e) { // 如果数据发布或者处理遇到错误会调用 Subscriber#onError actual.onError(e); } } } @Override public void cancel() { isCanceled = true; } } public static void main(String[] args) { // 1. Subscriber ”订阅“ Publisher new SimplePublisher().subscribe(new SimpleSubscriber()); } }
响应式编程,就像装配一条流水线。Publisher 规定了数据如何生产,中间会有 Operators(操作符)对流水线的数据进行解析,校验,转换等等操作,最终处理好的数据流转到 Subscriber。
这条流水线还有一个特点。大部分情况下当 Publisher 的 subscribe 方法被调用之前,什么都不会发生。在被订阅之前我们只是在定义流水线该如何工作,直到真正有人需要的时候,流水线才会启动。
Operators 怎么理解呢?对于上游来说,Operators 像一个订阅者,而对于它的下游来说,它像一个发布者(我们上文说过了 Reactor 中的中间操作并没有实现 Processor 接口)
Mono.just("hello") .map(a -> a + "world") .subscribe(System.out::println);
举个简单的例子,在上面的代码中,map 就是一个 Operator,它的实现思路是什么?来看下面的代码
// 注意,这是我基于 Reactor API 实现的伪代码! public static class MonoMap implements Publisher { // 我们自定义的转换逻辑 private Function mapper; // source 代表当前操作符的上游发布者 private Publisher source; public MonoMap(Publisher source, Function mapper) { this.source = source; this.mapper = mapper; } @Override public void subscribe(Subscriber actual) { source.subscribe(new MonoMapSubscriber(mapper, actual)); } } public static class MonoMapSubscriber implements Subscriber { // 我们自定义的转换逻辑 private Function mapper; // 真正的下游 private Subscriber actual; public MonoMapSubscriber(Function mapper, Subscriber actual) { this.mapper = mapper; this.actual = actual; } @Override public void onSubscribe(Subscription s) { actual.onSubscribe(s); } @Override public void onNext(Object o) { // 当上游数据发送过来时,先进行转换再发送给下游 Object result = mapper.apply(o); actual.onNext(result); } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } }
上述代码是我自己实现的一个伪代码,用于让大家理解操作符的实现思路,实际 Reactor 代码也是这个思路,只不过实现的更加巧妙和严谨
我们首先来分析一下 Mono.just("hello").map(a -> a + "world") 这句话
Mono.just("hello").map(a -> a + "world")
当执行到 Mono.just 时,会新建一个 MonoJust 对象作为当前的 Publisher。该发布者的逻辑是,当订阅时,向下游发送数据 "hello"
当执行到 map 方法时,会新建一个 MonoMap 对象替作为当前的 Publisher,MonoJust 成为了 MonoMap 中的一个属性 source(实际的上游)
当 MonoMap 被订阅时,会先将它的下游 actual 做一层包装,也就是我们上面的 MonoMapSubscriber。然后去调用 source 的 subscribe 方法。上游发布数据时,MonoMapSubscriber 先对数据进行转换(我们上面的拼接字符串操作),然后再发送给 actual(它的下游)
当 MonoMap 被再次转换时,MonoMap 就变成了下游操作符的 source...
最后通过一张图来总结一下
本文并没有介绍太多 Reactor 的细节,因为这些东西实在是太多了。我想聊聊我自己是如何学习 Reactor 的
如果你已经通过本文理解了响应式编程的核心接口是如何工作的了,那恭喜你已经迈向了成功的第一步了。接下来就是阅读官方文档,不断的练习和阅读 Reactor 的源码。源码追踪的方向已经很明确了,当我们想了解一个发布者的实现原理是什么,我就要去关注这个发布者的 subscribe 方法和 Subscription 都做了什么。想了解消费者的逻辑,就看它的 onNext,onComplete,onError。
如果觉得我的文章对你有帮助,动动小手点下关注,你的支持是对我最大的帮助
The text was updated successfully, but these errors were encountered:
No branches or pull requests
本文目标
前言
之前的「聊聊 IO 多路复用」中,我们理解了非阻塞 IO 的意义。但是 Spring MVC 并不能完美的应用非阻塞编程,于是 Spring 团队开发了 WebFlux,而 WebFlux 的基础正是本文要讲到的 Project Reactor(下文简称为 Reactor)
本文以 Reactor 为例带大家入门响应式编程
版本
什么是 Reactor
上面是对于官方文档的翻译。下面来说说我自己对 Reactor 和响应式编程的理解。
回想一下之前的非阻塞 IO 编程,例如我们现在要用非阻塞的方式调用一个远程服务,当远程接口数据可用时去做一些业务处理。这时候代码怎么写呢?我们需要提供一个回调函数,然后在响应就绪的时候,去调用我们的回调函数。
从逻辑上来看,这完全没有问题。但是如果我们的回调很复杂,代码看起来会是什么样呢?
这个代码说实话已经有点回调地狱那味儿了,让一段不是很复杂的逻辑变得很难读了。但是如果用 Reactor 写呢?
可以看到,代码变得非常的简洁。唯一带来的困扰就是,我们不知道这些函数到底是啥意思 😂
响应式编程虽然有非常多的特性,但是它并不是什么神奇的技术,它也是建立在传统命令式编程的基础上。只不过它所提供的 API 以及规范更适合在非阻塞 IO 中使用。虽然在非阻塞 IO 框架中几乎只使用响应式编程(Vertx,WebFlux),只是因为这样做更合适,并不是说没了响应式编程,就玩不了非阻塞 IO 了。
响应式编程内幕
Reactor 实现了
org.reactivestreams
提供的 Java 响应式编程规范,我们只要了解 reactivestreams 中代码是如何运转的,再看 Reactor 相关的代码就容易多了。下图展示了 reactivestreams 中的核心接口
Publisher:发布者
Subscriber:订阅者
Subscription:这个单词中文翻译为名词的订阅,在代码中它是发布者和订阅者之间的媒介
Processor:该接口继承了发布者和订阅者,可以理解为发布者和订阅者的中间操作(但是 Reactor 的中间操作并没有实现 Processor,在最新版本的 Reactor 中,Processor 的相关实现接口已经被弃用)
在了解了响应式编程的核心接口之后,我们来看下响应式编程是如何运作的
在 Reactor 中大部分实现都是按照上图的逻辑来执行的
其实 Reactor 中 API 实现原理也都是这个套路,我这边也自己写了个例子便于让读者加深对响应式编程的理解
响应式编程思想
响应式编程,就像装配一条流水线。Publisher 规定了数据如何生产,中间会有 Operators(操作符)对流水线的数据进行解析,校验,转换等等操作,最终处理好的数据流转到 Subscriber。
这条流水线还有一个特点。大部分情况下当 Publisher 的 subscribe 方法被调用之前,什么都不会发生。在被订阅之前我们只是在定义流水线该如何工作,直到真正有人需要的时候,流水线才会启动。
Reactor 中的 Operator
Operators 怎么理解呢?对于上游来说,Operators 像一个订阅者,而对于它的下游来说,它像一个发布者(我们上文说过了 Reactor 中的中间操作并没有实现 Processor 接口)
举个简单的例子,在上面的代码中,map 就是一个 Operator,它的实现思路是什么?来看下面的代码
上述代码是我自己实现的一个伪代码,用于让大家理解操作符的实现思路,实际 Reactor 代码也是这个思路,只不过实现的更加巧妙和严谨
我们首先来分析一下
Mono.just("hello").map(a -> a + "world")
这句话当执行到 Mono.just 时,会新建一个 MonoJust 对象作为当前的 Publisher。该发布者的逻辑是,当订阅时,向下游发送数据 "hello"
当执行到 map 方法时,会新建一个 MonoMap 对象替作为当前的 Publisher,MonoJust 成为了 MonoMap 中的一个属性 source(实际的上游)
当 MonoMap 被订阅时,会先将它的下游 actual 做一层包装,也就是我们上面的 MonoMapSubscriber。然后去调用 source 的 subscribe 方法。上游发布数据时,MonoMapSubscriber 先对数据进行转换(我们上面的拼接字符串操作),然后再发送给 actual(它的下游)
当 MonoMap 被再次转换时,MonoMap 就变成了下游操作符的 source...
最后通过一张图来总结一下
Reactor 该如何学习
本文并没有介绍太多 Reactor 的细节,因为这些东西实在是太多了。我想聊聊我自己是如何学习 Reactor 的
如果你已经通过本文理解了响应式编程的核心接口是如何工作的了,那恭喜你已经迈向了成功的第一步了。接下来就是阅读官方文档,不断的练习和阅读 Reactor 的源码。源码追踪的方向已经很明确了,当我们想了解一个发布者的实现原理是什么,我就要去关注这个发布者的 subscribe 方法和 Subscription 都做了什么。想了解消费者的逻辑,就看它的 onNext,onComplete,onError。
最后
The text was updated successfully, but these errors were encountered: