在前一章中,我们介绍了 RxCpp 库及其编程模型。我们编写了一些程序来了解图书馆的运作。我们还介绍了 RxCpp 库的基本元素。在这一章中,我们将深入讨论 RxCpp 库的关键元素以及反应式编程模型,包括以下内容:
- 看得见的
- 观察者及其变体(订阅者)
- 学科
- 调度程序
- 经营者
实际上,反应式编程的关键方面如下:
- 观察点是观察者可以订阅通知的流
- 主体是可观察物和观察者的组合
schedulers
执行与操作员相关联的操作,并帮助数据从观察点流向观察者- 运算符是接受一个可观测值并发出另一个可观测值的函数
在前一章中,我们从头开始创建了 Observables,并为这些 Observables 编写了订阅者。在我们所有的例子中,Observables 创建了一个Producer
类的实例。Producer
类产生一个事件流。换句话说,可观察对象是连接用户和生产者的功能。在我们继续之前,让我们剖析一个可观察的对象和与之相关的核心活动:
- 可观测值是以观察者为参数并返回一个函数的函数
- 可观察对象将观察者连接到生产者(生产者对于观察者是不透明的)
- 生产者是可观察的价值来源
- 观察者是具有
on_next
、on_error
和on_completed
方法的物体
生产者是可观察的价值来源。生产者可以是窗口、定时器、网络套接字、DOM 树、集合/容器上的迭代器等等。它们可以是任何可以传递给观察者的数据源。下一个(值) (在RxCpp
、observer.on_next(value)
中。)
在前一章的大多数例子中,我们看到生产者是在可观察函数中创建的。生产者可以在可观察函数外部创建,对生产者的引用可以放在可观察函数内部。引用其内部生产者实例的可观察对象称为热可观察对象。我们在里面创造了一个生产者的任何可观察的东西被称为冷的可观察的。为了说明问题,让我们编写一个程序来演示一个冷的可观察的:
//---------- ColdObservable.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[])
{
//----------- Get a Coordination
auto eventloop = rxcpp::observe_on_event_loop();
//----- Create a Cold Observable
auto values = rxcpp::observable<>::interval(
std::chrono::seconds(2)).take(2);
间隔创建了一个冷的可观察值,因为事件流的生产者是由interval
函数实例化的。当订阅或观察者附加到可观察对象时,冷可观察对象将发出数据。即使订阅有延迟,结果也是一致的。这意味着我们将获得所有可观测到的数据:
//----- Subscribe Twice
values.subscribe_on(eventloop).
subscribe([](int v){printf("[1] onNext: %dn", v);},
[](){printf("[1] onCompletedn");});
values.subscribe_on(eventloop).
subscribe([](int v){printf("[2] onNext: %dn", v);},
[](){printf("[2] onCompletedn");});
//---- make a blocking subscription to see the results
values.as_blocking().subscribe();
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(std::chrono::milliseconds(2000)).
subscribe([&](long){ });
}
程序发出的输出如下。对于每次运行,控制台中内容的顺序可能会改变,因为我们在同一线程中调度观察者方法的执行。不会因为订阅延迟而导致任何数据丢失:
[1] onNext: 1
[2] onNext: 1
[2] onNext: 2
[1] onNext: 2
[2] onCompleted
[1] onCompleted
我们可以通过调用可观测的publish
方法,将冷的可观测转换为热的可观测。将冷的可观测值转换为热的可观测值的后果是数据会被以后的订阅遗漏。无论是否有订阅,热点可观察都会发出数据。以下程序演示了这一点:
//---------- HotObservable.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[]) {
auto eventloop = rxcpp::observe_on_event_loop();
//----- Create a Cold Observable
//----- Convert Cold Observable to Hot Observable
//----- using .Publish();
auto values = rxcpp::observable<>::interval(
std::chrono::seconds(2)).take(2).publish();
//----- Subscribe Twice
values.
subscribe_on(eventloop).
subscribe(
[](int v){printf("[1] onNext: %dn", v);},
[](){printf("[1] onCompletedn");});
values.
subscribe_on(eventloop).
subscribe(
[](int v){printf("[2] onNext: %dn", v);},
[](){printf("[2] onCompletedn");});
//------ Connect to Start Emitting Values
values.connect();
//---- make a blocking subscription to see the results
values.as_blocking().subscribe();
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long){ });
}
在下一个例子中,我们将看看RxCpp
支持的publish_synchronized
机制。从编程接口的角度来看,这只是一个小小的改变。看看这个程序:
//---------- HotObservable2.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[]) {
auto eventloop = rxcpp::observe_on_event_loop();
//----- Create a Cold Observable
//----- Convert Cold Observable to Hot Observable
//----- using .publish_synchronized();
auto values = rxcpp::observable<>::interval(
std::chrono::seconds(2)).
take(5).publish_synchronized(eventloop);
//----- Subscribe Twice
values.
subscribe(
[](int v){printf("[1] onNext: %dn", v);},
[](){printf("[1] onCompletedn");});
values.
subscribe(
[](int v){printf("[2] onNext: %dn", v);},
[](){printf("[2] onCompletedn");});
//------ Start Emitting Values
values.connect();
//---- make a blocking subscription to see the results
values.as_blocking().subscribe();
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long){ });
}
程序的输出如下。我们可以看到输出是很好地同步的,也就是说,输出是以正确的顺序打印的:
[1] onNext: 1
[2] onNext: 1
[1] onNext: 2
[2] onNext: 2
[1] onNext: 3
[2] onNext: 3
[1] onNext: 4
[2] onNext: 4
[1] onNext: 5
[2] onNext: 5
[1] onCompleted
[2] onCompleted
无论是否有用户,一个热的可观察对象都会发出数据。这有时会成为一个问题。反应式编程中有一种缓存数据的机制,这样以后的用户就可以通过一个可观察的。我们可以使用.replay()
方法来创建这样一个可观察的。让我们编写一个程序来演示重放机制,这在编写热 Observables 时非常有用:
//---------- ReplayAll.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[]) {
auto values = rxcpp::observable<>::interval(
std::chrono::milliseconds(50),
rxcpp::observe_on_new_thread()).
take(5).replay();
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ldn", v);},
[](){printf("[1] OnCompletedn");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(
std::chrono::milliseconds(125)).subscribe([&](long){
values.as_blocking().subscribe(
[](long v){printf("[2] OnNext: %ldn", v);},
[](){printf("[2] OnCompletedn");});
});
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long){ });
}
在编写反应式程序时,您真的需要理解热观察值和冷观察值之间的语义差异。我们只谈到了其中的一些方面。请参考 RxCpp 文档和 ReactiveX 文档了解更多信息。网上有无数关于这个话题的文章。
观察者订阅一个可观察对象,等待事件被通知。上一章已经谈到了观察员。因此,我们将重点关注订阅者,它是观察者和订阅者的组合。订户可以取消订阅。用普通的观察者,你只能订阅。以下程序将很好地解释这些概念:
//---- Subscriber.cpp
#include "rxcpp/rx.hpp"
int main() {
//----- create a subscription object
auto subscription = rxcpp::composite_subscription();
//----- Create a Subscription
auto subscriber = rxcpp::make_subscriber<int>(
subscription,
[&](int v){
printf("OnNext: --%dn", v);
if (v == 3)
subscription.unsubscribe(); // Demonstrates Un Subscribes
},
[](){ printf("OnCompletedn");});
rxcpp::observable<>::create<int>(
[](rxcpp::subscriber<int> s){
for (int i = 0; i < 5; ++ i) {
if (!s.is_subscribed())
break;
s.on_next(i);
}
s.on_completed();
}).subscribe(subscriber);
return 0;
}
对于编写具有并发性和动态性的非平凡程序,订阅和取消订阅的能力非常方便。通过查阅 RxCpp 文档,深入了解该主题。
主体是同时既是观察者又是可观察的实体。它有助于将通知从一个可观察对象传递给一组观察对象。我们可以实现复杂的技术,比如数据的缓存和缓冲。我们也可以用一个主语把热的可观察转换成冷的可观察。RxCpp.
中实现了四种主体变体,如下所示:
SimpleSubject
BehaviorSubject
ReplaySubject
SynchronizeSubject
让我们编写一个简单的程序,它将作为观察者订阅数据,并作为一对订阅者的可观察对象:
//------- SimpleSubject.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[]) {
//----- Create an instance of Subject
rxcpp::subjects::subject<int> subject;
//----- Retreive the Observable
//----- attached to the Subject
auto observable = subject.get_observable();
//------ Subscribe Twice
observable.subscribe( [] ( int v ) { printf("1------%dn",v ); });
observable.subscribe( [] ( int v ) { printf("2------%dn",v );});
//--------- Get the Subscriber Interface
//--------- Attached to the Subject
auto subscriber = subject.get_subscriber();
//----------------- Emit Series of Values
subscriber.on_next(1);
subscriber.on_next(4);
subscriber.on_next(9);
subscriber.on_next(16);
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(std::chrono::milliseconds(2000)).
subscribe([&](long){ });
}
BehaviorSubject
是 subject 的一个变体,作为实现的一部分存储最后发出的(当前)值。任何新用户将立即获得当前值。否则,它的行为就像一个正常的主体。BehaviorSubject
也称为属性或单元格。在我们用一系列数据更新特定单元或内存的情况下,例如在事务中,它非常有用。让我们编写一个程序来演示BehaviorSubject
的工作原理:
//-------- BehaviorSubject.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[]) {
rxcpp::subjects::behavior<int> behsubject(0);
auto observable = behsubject.get_observable();
observable.subscribe( [] ( int v ) {
printf("1------%dn",v );
});
observable.subscribe( [] ( int v ) {
printf("2------%dn",v );
});
auto subscriber = behsubject.get_subscriber();
subscriber.on_next(1);
subscriber.on_next(2);
int n = behsubject.get_value();
printf ("Last Value ....%dn",n);
}
ReplaySubject
是存储已经发出的数据的主体的变体。我们可以指定参数来指示主题必须保留多少值。这在处理热门的 Observables 时非常方便。各种重放重载的原型如下:
replay (Coordination cn,[optional] composite_subscription cs)
replay (std::size_t count, Coordination cn, [optional]composite_subscription cs)
replay (duration period, Coordination cn, [optional] composite_subscription cs)
replay (std::size_t count, duration period, Coordination cn,[optional] composite_subscription cs).
我们写个程序看看ReplaySubject
的语义:
//------------- ReplaySubject.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[]) {
//----------- instantiate a ReplaySubject
rxcpp::subjects::replay<int,rxcpp::observe_on_one_worker>
replay_subject(10,rxcpp::observe_on_new_thread());
//---------- get the observable interface
auto observable = replay_subject.get_observable();
//---------- Subscribe!
observable.subscribe( [] ( int v ) {printf("1------%dn",v );});
//--------- get the subscriber interface
auto subscriber = replay_subject.get_subscriber();
//---------- Emit data
subscriber.on_next(1);
subscriber.on_next(2);
//-------- Add a new subscriber
//-------- A normal subject will drop data
//-------- Replay subject will not
observable.subscribe( [] ( int v ) { printf("2------%dn",v );});
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long){ });
}
在这一节中,我们已经讨论了一个主题的三种变体。主要的用例是通过使用可观察的接口来利用来自不同来源的事件和数据,并允许一组订阅者使用利用的数据。SimpleSubject
既可以作为可观测者,也可以作为观察者来处理数据流。BehaviorSubject
用于监控一段时间内属性或变量的变化。ReplaySubject
将帮助您避免因订阅延迟而导致的数据丢失。SynchronizeSubject
是一个在其实现中内置同步逻辑的主题。
RxCpp 库提供了一个声明式线程机制,这要归功于与之打包的健壮的调度子系统。从可观察的角度来看,数据可以沿着变更传播图通过不同的路径流动。通过向流处理管道给出提示,我们可以在不同的线程、同一个线程或一个后台线程中调度执行。这有助于更好地抓住程序员的意图。
RxCpp 中的声明性调度模型是可能的,因为在操作符的实现中流是不变性的。流操作符将一个可观测值作为参数,并返回一个新的可观测值作为结果。输入参数未被修改。这有助于无序执行。RxCpp 的调度子系统包含以下结构:
- 调度程序
- 工人
- 协调
- 协调者
- 可调度的
- 时间表
RxCpp 的版本 2 借鉴了RxJava
系统的调度架构。它依赖于RxJava
使用的调度器和工作器习惯用法。以下是关于调度程序的一些重要事实:
- 调度程序有一个时间表。
- 调度程序可以在时间线中创建许多工作人员。
- 工作人员在时间线中拥有一个可调度的队列。
schedulable
拥有一个功能(称为Action
),并且有寿命。- 一个
Coordination
作为协调器的工厂,并有一个调度器。 - 每个协调员都有一名工人,是以下工作的工厂:
- 协调
schedulable
- 协调的观察点和用户
- 协调
我们一直在我们的程序中使用 Rx schedulers
,没有考虑它们是如何在引擎盖下工作的。让我们写一个玩具程序,这将有助于我们理解调度是如何工作的:
//------------- SchedulerOne.cpp
#include "rxcpp/rx.hpp"
int main(){
//---------- Get a Coordination
auto Coordination function= rxcpp::serialize_new_thread();
//------- Create a Worker instance through a factory method
auto worker = coordination.create_coordinator().get_worker();
//--------- Create a action object
auto sub_action = rxcpp::schedulers::make_action(
[] (const rxcpp::schedulers::schedulable&) {
printf("Action Executed in Thread # : %dn",
std::this_thread::get_id());
} );
//------------- Create a schedulable and schedule the action
auto scheduled = rxcpp::schedulers::make_schedulable(worker,sub_action);
scheduled.schedule();
return 0;
}
在RxCpp
中,所有以多个流为输入,或处理与时间有关系的任务的操作员,都以一个Coordination
函数为参数。使用特定调度程序的一些Coordination
功能如下:
identity_immediate()
identity_current_thread()
identity_same_worker(worker w)
serialize_event_loop()
serialize_new_thread()
serialize_same_worker(worker w)
observe_on_event_loop()
observe_on_new_thread()
在前面的程序中,我们手动安排了一个动作(事实上,它只是一个 Lambda)。让我们继续讨论调度程序的声明性方面。我们将编写一个程序,使用Coordination
函数安排任务:
//----------- SchedulerTwo.cpp
#include "rxcpp/rx.hpp"
int main(){
//-------- Create a Coordination function
auto Coordination function= rxcpp::identity_current_thread();
//-------- Instantiate a coordinator and create a worker
auto worker = coordination.create_coordinator().get_worker();
//--------- start and the period
auto start = coordination.now() + std::chrono::milliseconds(1);
auto period = std::chrono::milliseconds(1);
//----------- Create an Observable (Replay )
auto values = rxcpp::observable<>::interval(start,period).
take(5).replay(2, coordination);
//--------------- Subscribe first time using a Worker
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe( [](long v){ printf("#1 -- %d : %ldn",
std::this_thread::get_id(),v); },
[](){ printf("#1 --- OnCompletedn");});
});
worker.schedule([&](const rxcpp::schedulers::schedulable&){
values.subscribe( [](long v){printf("#2 -- %d : %ldn",
std::this_thread::get_id(),v); },
[](){printf("#2 --- OnCompletedn");});
});
//----- Start the emission of values
worker.schedule([&](const rxcpp::schedulers::schedulable&)
{ values.connect();});
//------- Add blocking subscription to see results
values.as_blocking().subscribe(); return 0;
}
我们使用重放机制创建了一个热观察器来处理一些观察器的延迟订阅。我们还创建了一个 Worker 来调度订阅,并将观察者与可观察对象连接起来。上一个程序演示了调度程序如何在RxCpp
中工作。
ObserveOn
和SubscribeOn
操作符的行为方式不同,这一直是反应式编程新手的困惑之源。ObserveOn
操作符改变其下方的操作符和观察者的线程。在SubscribeOn
的情况下,它也影响上面和下面的操作者和方法。下面的程序演示了由SubscribeOn
和ObserveOn
操作员行为方式引起的行为的细微变化。让我们编写一个使用ObserveOn
运算符的程序:
//-------- ObservableOnScheduler.cpp
#include "rxcpp/rx.hpp"
int main(){
//------- Print the main thread id
printf("Main Thread Id is %dn",
std::this_thread::get_id());
//-------- We are using observe_on here
//-------- The Map will use the main thread
//-------- Subscribed Lambda will use a new thread
rxcpp::observable<>::range(0,15).
map([](int i){
printf("Map %d : %dn", std::this_thread::get_id(),i);
return i; }).
take(5).observe_on(rxcpp::synchronize_new_thread()).
subscribe([&](int i){
printf("Subs %d : %dn", std::this_thread::get_id(),i);
});
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long){ });
return 0;
}
前面程序的输出如下:
Main Thread Id is 1
Map 1 : 0
Map 1 : 1
Subs 2 : 0
Map 1 : 2
Subs 2 : 1
Map 1 : 3
Subs 2 : 2
Map 1 : 4
Subs 2 : 3
Subs 2 : 4
前面程序的输出清楚地显示 map 在主线程中工作,而subscribe
方法在辅助线程中被调度。这清楚地表明ObserveOn
只对其下的运营商和用户起作用。让我们编写一个或多或少相同的程序,使用SubscribeOn
运算符而不是ObserveOn
运算符。看看这个:
//-------- SubscribeOnScheduler.cpp
#include "rxcpp/rx.hpp"
int main(){
//------- Print the main thread id
printf("Main Thread Id is %dn",
std::this_thread::get_id());
//-------- We are using subscribe_on here
//-------- The Map and subscribed Lambda will
//--------- use the secondary thread
rxcpp::observable<>::range(0,15).
map([](int i){
printf("Map %d : %dn", std::this_thread::get_id(),i);
return i;
}).
take(5).subscribe_on(rxcpp::synchronize_new_thread()).
subscribe([&](int i){
printf("Subs %d : %dn", std::this_thread::get_id(),i);
});
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long){ });
return 0;
}
前面程序的输出如下:
Main Thread Id is 1
Map 2 : 0
Subs 2 : 0
Map 2 : 1
Subs 2 : 1
Map 2 : 2
Subs 2 : 2
Map 2 : 3
Subs 2 : 3
Map 2 : 4
Subs 2 : 4
前面程序的输出显示 map 和 subscription 方法都在辅助线程中工作。这清楚地表明SubscribeOn
改变了前后项的线程行为。
RxCpp 库没有内置的主线程调度器。你能做的最接近的就是利用run_loop
类来模拟主线程中的调度。在下面的程序中,可观察对象在后台线程中执行,订阅方法在主线程中运行。我们使用subscribe_on
和observe_on
来实现这一目标:
//------------- RunLoop.cpp
#include "rxcpp/rx.hpp"
int main(){
//------------ Print the Main Thread Id
printf("Main Thread Id is %dn",
std::this_thread::get_id());
//------- Instantiate a run_loop object
//------- which will loop in the main thread
rxcpp::schedulers::run_loop rlp;
//------ Create a Coordination functionfor run loop
auto main_thread = rxcpp::observe_on_run_loop(rlp);
auto worker_thread = rxcpp::synchronize_new_thread();
rxcpp::composite_subscription scr;
rxcpp::observable<>::range(0,15).
map([](int i){
//----- This will get executed in worker
printf("Map %d : %dn", std::this_thread::get_id(),i);
return i;
}).take(5).subscribe_on(worker_thread).
observe_on(main_thread).
subscribe(scr, [&](int i){
//--- This will get executed in main thread
printf("Sub %d : %dn", std::this_thread::get_id(),i); });
//------------ Execute the Run Loop
while (scr.is_subscribed() || !rlp.empty()) {
while (!rlp.empty() && rlp.peek().when < rlp.now())
{ rlp.dispatch();}
}
return 0;
}
前面程序的输出如下:
Main Thread Id is 1
Map 2 : 0
Map 2 : 1
Sub 1 : 0
Sub 1 : 1
Map 2 : 2
Map 2 : 3
Sub 1 : 2
Map 2 : 4
Sub 1 : 3
Sub 1 : 4
我们可以看到,映射是在工作线程中调度的,订阅方法是在主线程中执行的。这是因为Subscribe_on
和Observe_on
方法的合理放置,我们在前面的章节中已经介绍过了。
算子是作用于可观测值以产生新的可观测值的函数。在这个过程中,最初的可观察对象不是突变的,而是一个纯粹的函数。在我们编写的示例程序中,我们已经介绍了许多运算符。在第 9 章、使用 Qt/C++ 的反应式 GUI 编程中,我们将学习如何创建自定义操作符来处理 Observables。一个操作符不变异一个可观察的事实是声明式调度在 Rx 编程模型中工作的一个原因。Rx 运营商可分为以下几类:
- 创建运算符
- 转换运算符
- 过滤运算符
- 组合运算符
- 错误处理运算符
- 公用事业运营商
- 布尔运算符
- 数学运算符
有一些操作符不属于这些类别。我们将在一个表格中概述上述类别中的一些关键运算符,以供快速参考。
这些操作符将帮助人们从输入数据中创建各种各样的可观测值。我们已经在示例代码中演示了 create、from、interval 和 range 的用法。参考这些示例和 RxCpp 文档,了解更多相关信息。包含如下一些运算符的表:
| 观察到 | 描述 |
| create
| 通过以编程方式调用观察者方法来创建可观察对象 |
| defer
| 为每个观察者/订阅者创建一个新的可观测值 |
| empty
| 创建一个不发射任何东西(只发射完成的)的可观测值 |
| from
| 基于参数创建可观察值(多态) |
| interval
| 创建一个在时间间隔内发出一系列值的可观测值 |
| just
| 创建发出单个值的可观测值 |
| range
| 创建发出一系列值的可观察值 |
| never
| 创造一个永远不会发射任何东西的可观察物体 |
| repeat
| 创建重复值流的可观察值 |
| timer
| 创建一个在指定为参数的延迟后发出值的可观测值 |
| throw
| 创建发出错误的可观测值 |
这些操作符帮助用户创建一个新的可观测值,而无需修改源可观测值。它们通过应用一个 Lambda 对源可观察中的单个项目进行操作。包含一些最有用的转换运算符的表如下:
| 观察到 | 描述 |
| buffer
| 收集过去的值并在发出信号时发出的可观察值 |
| flat_map
| 发出将函数应用于由源可观测值和集合可观测值发出的一对值的结果的可观测值 |
| group_by
| 有助于对可观察值进行分组的可观察值 |
| map
| 可观察的,从源可观察的发出项目,由指定的函数转换 |
| scan
| 发出对累加器函数的每次调用结果的可观察值 |
| window
| 发出连接的非重叠窗口的可观察对象,每个窗口最多包含来自源可观察对象的计数项 |
过滤流的能力是流处理中的常见活动。Rx 编程模型定义了很多这样的运算符,这并不罕见。过滤运算符主要是谓词函数或 Lambdas。下表包含筛选运算符列表:
| 观察到 | 描述 |
| debounce
| 如果特定的时间跨度已经过去,而没有从源发出另一个项目,则发出一个项目的可观察 |
| distinct
| 从源发出那些项目的可观察的 |
| element_at
| 发出位于指定索引位置的项的可观察对象 |
| filter
| 仅发出源发出的项目的可观察值过滤器评估为真的可观察值 |
| first
| 仅发出源发出的第一个项目的可观察 |
| ignore_eleements
| 从源发出终止通知的可观察 |
| last
| 仅发出源发出的最后一个项目的可观察 |
| sample
| 可观测的,发出由源发出的最近的项目可观测的在周期时间间隔内 |
| skip
| 与源可观测值相同的可观测值,除了它不发出源可观测值发出的第一个 t 项 |
| skip_last
| 与源可观测值相同的可观测值,除了它不发出源可观测值发出的最后 t 个项目 |
| take
| 仅发出源可观察对象发出的前 t 个项目的可观察对象,或者如果该可观察对象发出的项目少于 t 个,则发出源可观察对象的所有项目 |
| take_last
| 仅发出源发出的最后 t 个项目的可观测值 |
Rx 编程模型的主要目标之一是将事件源与事件接收器分离。显然,需要能够组合来自各种来源的流的运营商。RxCpp 库实现了一组这样的操作符。下表概述了一组常用的组合运算符:
| 观察到 | 描述 |
| combine_latest
| 当一个项目由两个可观察对象中的任何一个发出时,通过指定的函数组合每个可观察对象发出的最新项目,并基于该函数的结果发出项目 |
| merge
| 这通过合并它们的排放将多个可观测值合并成一个 |
| start_with
| 这将在开始从源可观察对象发出项目之前发出指定的项目序列 |
| switch_on_next
| 这将发出可观测值的可观测值转换为发出最近发出的可观测值的单个可观测值 |
| zip
| 这通过一个指定的函数将多个可观测值的发射组合在一起,并基于该函数的结果为每个组合发射单个项目 |
这些操作符有助于从可观察到的错误通知中恢复。看看这张桌子:
| 观察到 | 描述 |
| Catch
| 不支持RxCpp
|
| retry
| 一个反映源可观测值的可观测值,如果调用on_error
达到指定的重试次数,则重新订阅该可观测值 |
下面是一个有用的操作工具箱,用于操作可观察对象:
| 观察到 | 描述 |
| finally
| 发出与源可观察对象相同项目的可观察对象,然后调用给定的操作 |
| observe_on
| 指定观察者将在其上观察该可观察对象的调度程序 |
| subscribe
| 对可观察到的排放和通知进行操作 |
| subscribe_on
| 指定订阅时可观察对象应该使用的调度程序 |
| scope
| 创建与可观察资源具有相同生命周期的可支配资源 |
以下是评估一个或多个可观测值或由可观测值发出的项目的运算符:
| 观察到 | 描述 |
| all
| 如果源可观测值发出的每个项目都满足指定条件,则发出 true 的可观测值;否则,它会发出 false |
| amb
| 发出与源 Observables 中最先发出项目或发送终止通知的源 Observables 相同序列的 Observables |
| contains
| 如果源可观测值发出指定的项目,则发出 true 的可观测值;否则它会发出假的 |
| default_if_empty
| 如果源可观测值发出指定的项目,则发出 true 的可观测值;否则它会发出假的 |
| sequence_equal
| 仅当两个序列在以相同顺序发出相同的项目序列后正常终止时才发出 true 的可观察值;否则,它会发出假的 |
| skip_until
| 丢弃可观察对象发出的项目,直到第二个可观察对象发出项目 |
| skip_while
| 丢弃由可观察对象发出的项目,直到指定的条件变为假 |
| take_until
| 在第二个可观察对象发出项目或终止后,丢弃可观察对象发出的项目 |
| take_while
| 在指定条件变为假后,丢弃由可观察对象发出的项目 |
这些运算符对可观察到的发出的整个项目序列进行操作:
| 观察到 | 描述 |
| average
| 计算可观测值发出的数字的平均值,并发出该平均值 |
| concat
| 从两个或两个以上的观察点发射辐射,而不交错它们 |
| count
| 计算源可观测值发出的项目数,并只发出该值 |
| max
| 确定并发出可观测值发出的最大值项 |
| Min
| 确定并发出可观测值发出的最小值项目 |
| reduce
| 对可观察对象发出的每个项目应用一个函数,依次发出最终值 |
| sum
| 计算可观测值发出的数字之和,并发出这个和 |
可连接的可观察操作符是特殊的可观察操作符,具有更精确控制的订阅动态。下表列出了其中的一些:
| 观察到 | 描述 |
| connect
| 指示可连接的可观察对象开始向其订户发射项目 |
| publish
| 将普通可观测值转换为可连接可观测值 |
| ref_count
| 让一个可连接的可观察对象像普通的可观察对象一样工作 |
| replay
| 确保所有观察者看到相同的发射项目序列,即使他们是在可观察对象开始发射项目后订阅的 |
在本章中,我们了解了 Rx 编程模型的各个部分是如何结合在一起的。我们从可观察对象开始,很快就进入了冷热可观察对象的话题。然后,我们介绍了订阅机制及其使用。然后,我们进入主题的重要主题,了解主题调度器实现的许多变体。最后,我们对 RxCpp 系统中可用的各种运营商进行了分类。在下一章中,我们将学习如何使用 Qt 框架以被动的方式使用这些知识编写图形用户界面程序。