- 在传统的编程范式中,我们一般通过迭代器(Iterator)模式来遍历一个序列。这种遍历方式是由调用者来控制节奏的,采用的是拉的方式。每次由调用者通过 next()方法来获取序列中的下一个值。
- 使用反应式流时采用的则是推的方式,即常见的发布者-订阅者模式。当发布者有新的数据产生时,这些数据会被推送到订阅者来进行处理。
- 在反应式流上可以添加各种不同的操作来对数据进行处理,形成数据处理链。这个以声明式的方式添加的处理链只在订阅者进行订阅操作时才会真正执行。
- RxJava 2 在设计和实现时考虑到了与规范的整合,不过为了保持与 RxJava 的兼容性,很多地方在使用时也并不直观。
- Reactor 则是完全基于反应式流规范设计和实现的库,没有 RxJava 那样的历史包袱,在使用上更加的直观易懂。
- Reactor 也是 Spring 5 中反应式编程的基础。学习和掌握 Reactor 可以更好地理解 Spring 5 中的相关概念。
-
Flux 类的静态方法
-
just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
-
fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
-
empty():创建一个不包含任何元素,只发布结束消息的序列。
-
error(Throwable error):创建一个只包含错误消息的序列。
-
never():创建一个不包含任何消息通知的序列。
-
range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
-
interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
-
intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
-
栗子
-
-
generate() 方法
-
create()方法
-
fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
-
delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
-
ignoreElements(Publisher source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
-
justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
-
栗子
-
buffer 和 bufferTimeout
-
把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。
-
在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔。
-
方法 buffer()仅使用一个条件,而 bufferTimeout()可以同时指定两个条件。
-
指定时间间隔时可以使用 Duration 对象或毫秒数,即使用 bufferMillis()或 bufferTimeoutMillis()两个方法。
-
bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;
-
bufferWhile 则只有当 Predicate 返回 true 时才会收集。一旦值为 false,会立即开始下一次收集。
-
栗子
-
-
filter
-
window
-
zipWith
-
take
-
take 系列操作符用来从当前流中提取元素。
-
take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的数量或时间间隔来提取。
-
takeLast(long n):提取流中的最后 N 个元素。
-
takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。
-
takeWhile(Predicate<? super T> continuePredicate): 当 Predicate 返回 true 时才进行提取。
-
takeUntilOther(Publisher<?> other):提取元素直到另外一个流开始产生元素。
-
栗子
-
-
reduce 和 reduceWith
-
merge 和 mergeSequential
-
flatMap 和 flatMapSequential
-
concatMap
-
combineLatest
-
当需要处理 Flux 或 Mono 中的消息时,可以通过 subscribe 方法来添加相应的订阅逻辑。
-
在调用 subscribe 方法时可以指定需要处理的消息类型,可以只处理其中包含的正常消息,也可以同时处理错误消息和完成消息。
-
栗子
-
不同调度器实现
- 当前线程,通过 Schedulers.immediate()方法来创建。
- 单一的可复用的线程,通过 Schedulers.single()方法来创建。
- 使用弹性的线程池,通过 Schedulers.elastic()方法来创建。
- 线程池中的线程是可以复用的。当所需要时,新的线程会被创建。
- 如果一个线程闲置太长时间,则会被销毁。
- 该调度器适用于 I/O 操作相关的流的处理。
- 使用对并行操作优化的线程池,通过 Schedulers.parallel()方法来创建。
- 其中的线程数量取决于 CPU 的核的数量。该调度器适用于计算密集型的流的处理。
- 使用支持任务调度的调度器,通过 Schedulers.timer()方法来创建。
- 从已有的 ExecutorService 对象中创建调度器,通过 Schedulers.fromExecutorService()方法来创建。
-
栗子
-
StepVerifier 的作用是可以对序列中包含的元素进行逐一验证。
-
通过 StepVerifier.create()方法对一个流进行包装之后再进行验证。
-
expectNext()方法用来声明测试时所期待的流中的下一个元素的值,而 verifyComplete()方法则验证流是否正常结束。
-
还有 verifyError()来验证流由于错误而终止。
-
栗子
-
通过 StepVerifier.withVirtualTime()方法可以创建出使用虚拟时钟的 StepVerifier。通过 thenAwait(Duration)方法可以让虚拟时钟前进。
-
栗子
需要验证的流中包含两个产生间隔为一天的元素,并且第一个元素的产生延迟是 4 个小时。在通过 StepVerifier.withVirtualTime()方法包装流之后,expectNoEvent()方法用来验证在 4 个小时之内没有任何消息产生,然后验证第一个元素 0 产生;接着 thenAwait()方法来让虚拟时钟前进一天,然后验证第二个元素 1 产生;最后验证流正常结束。
-
启动调试模式
-
使用检查点