一个基于 RxJava 设计理念的 C++20 ReactiveX 库,提供链式调用、丰富操作符和多种调度器,面向异步编程与事件流处理。
rx 以 Observable/Observer/Disposable/Scheduler 为核心抽象,强调可组合的操作符与可控的线程调度,适合构建复杂的异步数据流。
通过 CMake 自动拉取:
要求:CMake 3.20+,C++20 编译器,Git
mkdir build
cd build
cmake -DBUILD_RX_EXAMPLES=ON ..
cmake --build .#define USE_GANY_CORE
#include <gx/gany.h>
#include <rx/rx.h>
using namespace rx;
int main() {
initGAnyCore();
Observable::just(1, 2, 3, 4, 5)
->map([](const GAny &x) {
return x.toInt32() * 2;
})
->filter([](const GAny &x) {
return x.toInt32() % 4 == 0;
})
->subscribe([](const GAny &v) {
std::cout << "Value: " << v.toString() << std::endl;
});
return 0;
}MainThreadScheduler 依赖全局 GTimerScheduler,使用前必须先创建并启动:
auto mainScheduler = GTimerScheduler::create("MainScheduler");
mainScheduler->start();
GTimerScheduler::makeGlobal(mainScheduler);
auto timerScheduler = MainThreadScheduler::create();
Observable::timer(1000)
->observeOn(timerScheduler)
->subscribe([](const GAny &) {
std::cout << "Timer fired" << std::endl;
});
mainScheduler->run();Observable: 数据流源头,发射数据并完成或失败。Observer: 数据消费者,响应 onNext/onError/onComplete。Disposable: 订阅生命周期管理,可随时取消。Scheduler: 控制任务执行线程与时机。
- 创建:
createjustfromArrayrangeintervaltimeremptynevererrordefermergeconcatzip - 转换:
mapflatMapconcatMapswitchMaptoArraygroupBywindow - 过滤:
filterdistinctdistinctUntilChangedelementAtfirstlastignoreElementsskipskipLastskipWhiletaketakeLasttakeUntiltakeWhile - 组合:
combineLateststartWithbufferamb - 聚合:
scanreduce - 时间:
delaydebouncesampletimeout - 辅助:
repeatretrydoOnNextdoOnErrordoOnCompletedoOnSubscribedoFinallydoOnEach - 错误处理:
onErrorReturnonErrorResumeNextcatchError - 布尔:
allanycontainsisEmptydefaultIfEmptysequenceEqual - 调度:
subscribeOnobserveOn
- 操作符声明:
rx/include/rx/observable.h - 操作符实现:
rx/include/rx/operators/ - 功能示例:
examples/test_rx.cpp
rx/
├── examples/ # 示例代码
├── rx/
│ ├── include/rx/ # 头文件
│ └── src/ # 源文件
├── CMakeLists.txt
└── README.md
欢迎提交 Issue 和 Pull Request。
Gxin