From 4c338cad81a6b3523492620719909439e1011e48 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Tue, 24 Jun 2014 22:42:04 -0700 Subject: [PATCH] upgraded to use rxcpp 2.0.0 --- ticker/packages.config | 2 +- ticker/ticker.cpp | 45 ++++++++++++++++++++++++------------------ ticker/ticker.vcxproj | 6 +++--- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/ticker/packages.config b/ticker/packages.config index d15150b..b5cfd3d 100644 --- a/ticker/packages.config +++ b/ticker/packages.config @@ -1,4 +1,4 @@  - + \ No newline at end of file diff --git a/ticker/ticker.cpp b/ticker/ticker.cpp index 0653d01..36112b0 100644 --- a/ticker/ticker.cpp +++ b/ticker/ticker.cpp @@ -2,7 +2,7 @@ #include "frequency_meter.h" #include "active_ticker.h" -#include +#include #include namespace { @@ -37,36 +37,43 @@ namespace { void rxcpp_example() { FrequencyMeter FM; - auto scheduler = std::make_shared(); - auto measure = rxcpp::Interval(std::chrono::milliseconds(250),scheduler); - auto sleep = [&scheduler](int milliseconds) { - //rxcpp::from(rxcpp::Interval(std::chrono::milliseconds(milliseconds), scheduler)) - // .take(1) - // .for_each([](int){}); - //concurrency::wait(milliseconds); + std::atomic pending(2); + + // schedule everything on the same event loop thread. + auto scheduler = rxcpp::schedulers::make_same_worker(rxcpp::schedulers::make_event_loop().create_worker()); + auto coordination = rxcpp::identity_one_worker(scheduler); + auto measure = rxcpp::observable<>::interval(scheduler.now() + std::chrono::milliseconds(250), std::chrono::milliseconds(250), coordination); + auto sleep = [&scheduler](int milliseconds) { std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds)); }; - auto measure_subscription = rxcpp::from(measure) + auto measure_subscription = measure .subscribe([&FM](int val) { std::cout << FM.Hz() << std::endl; }); - auto ticker = rxcpp::Interval(std::chrono::milliseconds(500), scheduler); - rxcpp::from(ticker) + auto ticker = rxcpp::observable<>::interval(scheduler.now() + std::chrono::milliseconds(500), std::chrono::milliseconds(500), coordination); + ticker .take(10) .subscribe([](int val) { std::cout << "tick " << val << std::endl; - }); - - - sleep(2000); - std::cout << "Canceling measurement ..." << std::endl; - measure_subscription.Dispose(); // cancel measurement - - sleep(6000); // wait for ticker to finish + },[&](){ + --pending; // take completed the ticker + }); + + // schedule the cout on the same worker to keep it from merging with the other cout calls. + scheduler.create_worker().schedule(scheduler.now() + std::chrono::seconds(2), + [&](const rxcpp::schedulers::schedulable&) { + std::cout << "Canceling measurement ..." << std::endl; + measure_subscription.unsubscribe(); // cancel measurement + --pending; // signal measurement canceled + }); + + while (pending > 0) { + sleep(1000); // wait for ticker and measure to finish + } } } diff --git a/ticker/ticker.vcxproj b/ticker/ticker.vcxproj index 4410ac6..160025b 100644 --- a/ticker/ticker.vcxproj +++ b/ticker/ticker.vcxproj @@ -39,7 +39,7 @@ - 5fc5653d + a8c8fb0b true @@ -95,12 +95,12 @@ - + This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. - + \ No newline at end of file