From c2b85967c960aedd16c34c4aca0c31b3e176d600 Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Mon, 23 Jan 2017 14:14:16 +0100 Subject: [PATCH 1/2] added finally operator --- demo/finally/finally-error.php | 15 + demo/finally/finally-error.php.expect | 3 + demo/finally/finally.php | 9 + demo/finally/finally.php.expect | 5 + lib/Rx/Operator/FinallyCallOperator.php | 37 ++ .../Functional/Operator/FinallyCallTest.php | 327 ++++++++++++++++++ 6 files changed, 396 insertions(+) create mode 100644 demo/finally/finally-error.php create mode 100644 demo/finally/finally-error.php.expect create mode 100644 demo/finally/finally.php create mode 100644 demo/finally/finally.php.expect create mode 100644 lib/Rx/Operator/FinallyCallOperator.php create mode 100644 test/Rx/Functional/Operator/FinallyCallTest.php diff --git a/demo/finally/finally-error.php b/demo/finally/finally-error.php new file mode 100644 index 00000000..a29a1f42 --- /dev/null +++ b/demo/finally/finally-error.php @@ -0,0 +1,15 @@ +map(function($value) { + if ($value == 2) { + throw new \Exception(); + } + return $value; + }) + ->finallyCall(function() { + echo "Finally\n"; + }) + ->subscribe($stdoutObserver); diff --git a/demo/finally/finally-error.php.expect b/demo/finally/finally-error.php.expect new file mode 100644 index 00000000..7baf193f --- /dev/null +++ b/demo/finally/finally-error.php.expect @@ -0,0 +1,3 @@ +Next value: 1 +Exception: +Finally diff --git a/demo/finally/finally.php b/demo/finally/finally.php new file mode 100644 index 00000000..8bc1c4b8 --- /dev/null +++ b/demo/finally/finally.php @@ -0,0 +1,9 @@ +finallyCall(function() { + echo "Finally\n"; + }) + ->subscribe($stdoutObserver); diff --git a/demo/finally/finally.php.expect b/demo/finally/finally.php.expect new file mode 100644 index 00000000..26c084f1 --- /dev/null +++ b/demo/finally/finally.php.expect @@ -0,0 +1,5 @@ +Next value: 1 +Next value: 2 +Next value: 3 +Complete! +Finally diff --git a/lib/Rx/Operator/FinallyCallOperator.php b/lib/Rx/Operator/FinallyCallOperator.php new file mode 100644 index 00000000..f1366eeb --- /dev/null +++ b/lib/Rx/Operator/FinallyCallOperator.php @@ -0,0 +1,37 @@ +callback = $callback; + } + + /** + * @param \Rx\ObservableInterface $observable + * @param \Rx\ObserverInterface $observer + * @param \Rx\SchedulerInterface $scheduler + * @return \Rx\DisposableInterface + */ + public function __invoke(ObservableInterface $observable, ObserverInterface $observer, SchedulerInterface $scheduler = null) + { + return new BinaryDisposable( + $observable->subscribe($observer, $scheduler), + new CallbackDisposable($this->callback) + ); + } +} \ No newline at end of file diff --git a/test/Rx/Functional/Operator/FinallyCallTest.php b/test/Rx/Functional/Operator/FinallyCallTest.php new file mode 100644 index 00000000..96e4d546 --- /dev/null +++ b/test/Rx/Functional/Operator/FinallyCallTest.php @@ -0,0 +1,327 @@ +finallyCall(function() use (&$completed) { + $completed = true; + }) + ->subscribe(new CallbackObserver()); + + $this->assertTrue($completed); + } + + /** + * @test + */ + public function should_call_finally_after_error() + { + $thrown = false; + + Observable::fromArray([1, 2, 3]) + ->map(function($value) { + if ($value == 3) { + throw new \Exception(); + } + return $value; + }) + ->finallyCall(function() use (&$thrown) { + $thrown = true; + }) + ->subscribe(new CallbackObserver(null, function() {})); // Ignore the default error handler + + $this->assertTrue($thrown); + } + + /** + * @test + */ + public function should_call_finally_upon_disposal() + { + $disposed = false; + + Observable::create(function(ObserverInterface $obs) { + $obs->onNext(1); + }) + ->finallyCall(function() use (&$disposed) { + $disposed = true; + }) + ->subscribe(new CallbackObserver()) + ->dispose(); + + $this->assertTrue($disposed); + } + + /** + * @test + */ + public function should_call_finally_when_synchronously_subscribing_to_and_unsubscribing_from_a_shared_observable() + { + $disposed = false; + + Observable::create(function(ObserverInterface $obs) { + $obs->onNext(1); + }) + ->finallyCall(function() use (&$disposed) { + $disposed = true; + }) + ->share() + ->subscribe(new CallbackObserver()) + ->dispose(); + + $this->assertTrue($disposed); + } + + /** + * @test + */ + public function should_call_two_finally_instances_in_succession_on_a_shared_observable() + { + $invoked = 0; + + Observable::fromArray([1, 2, 3]) + ->finallyCall(function() use (&$invoked) { + $invoked++; + }) + ->finallyCall(function() use (&$invoked) { + $invoked++; + }) + ->share() + ->subscribe(new CallbackObserver()); + + $this->assertEquals(2, $invoked); + } + + /** + * @test + */ + public function should_handle_empty() + { + $executed = false; + + $xs = $this->createHotObservable([ + onCompleted(300) + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finallyCall(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onCompleted(300), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 300), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_never() + { + $executed = false; + + $xs = $this->createHotObservable([ ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finallyCall(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 1000), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_throw() + { + $executed = false; + + $e = new \Exception(); + + $xs = $this->createHotObservable([ + onError(300, $e) + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finallyCall(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onError(300, $e) + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 300), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_basic_hot_observable() + { + $executed = false; + + $xs = $this->createHotObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onCompleted(600), + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finallyCall(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onCompleted(600), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 600), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_basic_cold_observable() + { + $executed = false; + + $xs = $this->createColdObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onCompleted(600), + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finallyCall(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(500, 'a'), + onNext(600, 'b'), + onNext(700, 'c'), + onCompleted(800), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 800), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_basic_error() + { + $executed = false; + + $e = new \Exception(); + + $xs = $this->createHotObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onError(600, $e), + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finallyCall(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onError(600, $e), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 600), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_unsubscription() + { + $executed = false; + + $xs = $this->createHotObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + ]); + + $results = $this->scheduler->startWithDispose(function() use ($xs, &$executed) { + return $xs->finallyCall(function() use (&$executed) { + $executed = true; + }); + }, 450); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(300, 'a'), + onNext(400, 'b'), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 450), + ], $xs->getSubscriptions()); + } + +} \ No newline at end of file From 9c7745dbe64e6eb9eb247a902a0b6af2d52074cf Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Wed, 1 Feb 2017 09:54:31 +0100 Subject: [PATCH 2/2] finally() operator for 2.x --- demo/finally/finally-error.php | 4 +-- demo/finally/finally-error.php.expect | 2 +- demo/finally/finally.php | 2 +- src/Observable.php | 19 ++++++++++++++ .../Operator/FinallyOperator.php | 9 +++---- .../{FinallyCallTest.php => FinallyTest.php} | 26 +++++++++---------- 6 files changed, 40 insertions(+), 22 deletions(-) rename lib/Rx/Operator/FinallyCallOperator.php => src/Operator/FinallyOperator.php (73%) rename test/Rx/Functional/Operator/{FinallyCallTest.php => FinallyTest.php} (90%) diff --git a/demo/finally/finally-error.php b/demo/finally/finally-error.php index a29a1f42..f1567ae0 100644 --- a/demo/finally/finally-error.php +++ b/demo/finally/finally-error.php @@ -5,11 +5,11 @@ Rx\Observable::range(1, 3) ->map(function($value) { if ($value == 2) { - throw new \Exception(); + throw new \Exception('error'); } return $value; }) - ->finallyCall(function() { + ->finally(function() { echo "Finally\n"; }) ->subscribe($stdoutObserver); diff --git a/demo/finally/finally-error.php.expect b/demo/finally/finally-error.php.expect index 7baf193f..5c2abf4e 100644 --- a/demo/finally/finally-error.php.expect +++ b/demo/finally/finally-error.php.expect @@ -1,3 +1,3 @@ Next value: 1 -Exception: +Exception: error Finally diff --git a/demo/finally/finally.php b/demo/finally/finally.php index 8bc1c4b8..c41ce58d 100644 --- a/demo/finally/finally.php +++ b/demo/finally/finally.php @@ -3,7 +3,7 @@ require_once __DIR__ . '/../bootstrap.php'; Rx\Observable::range(1, 3) - ->finallyCall(function() { + ->finally(function() { echo "Finally\n"; }) ->subscribe($stdoutObserver); diff --git a/src/Observable.php b/src/Observable.php index f78eed12..9892a59f 100644 --- a/src/Observable.php +++ b/src/Observable.php @@ -35,6 +35,7 @@ use Rx\Operator\DistinctOperator; use Rx\Operator\DistinctUntilChangedOperator; use Rx\Operator\DoOnEachOperator; +use Rx\Operator\FinallyOperator; use Rx\Operator\GroupByUntilOperator; use Rx\Operator\IsEmptyOperator; use Rx\Operator\MapOperator; @@ -1940,6 +1941,24 @@ public function isEmpty(): Observable }); } + /** + * Will call a specified function when the source terminates on complete or error. + * + * @param callable $callback + * @return Observable + * + * @demo finally/finally.php + * @demo finally/finally-error.php + * @operator + * @reactivex do + */ + public function finally(callable $callback): Observable + { + return $this->lift(function() use ($callback) { + return new FinallyOperator($callback); + }); + } + /** * @param Promise $promise * @param SchedulerInterface|null $scheduler diff --git a/lib/Rx/Operator/FinallyCallOperator.php b/src/Operator/FinallyOperator.php similarity index 73% rename from lib/Rx/Operator/FinallyCallOperator.php rename to src/Operator/FinallyOperator.php index f1366eeb..f2b53b9f 100644 --- a/lib/Rx/Operator/FinallyCallOperator.php +++ b/src/Operator/FinallyOperator.php @@ -2,13 +2,13 @@ namespace Rx\Operator; +use Rx\DisposableInterface; use Rx\Disposable\CallbackDisposable; use Rx\Disposable\BinaryDisposable; use Rx\ObservableInterface; use Rx\ObserverInterface; -use Rx\SchedulerInterface; -class FinallyCallOperator implements OperatorInterface +class FinallyOperator implements OperatorInterface { /** @var callable */ private $callback; @@ -24,13 +24,12 @@ public function __construct(callable $callback) /** * @param \Rx\ObservableInterface $observable * @param \Rx\ObserverInterface $observer - * @param \Rx\SchedulerInterface $scheduler * @return \Rx\DisposableInterface */ - public function __invoke(ObservableInterface $observable, ObserverInterface $observer, SchedulerInterface $scheduler = null) + public function __invoke(ObservableInterface $observable, ObserverInterface $observer): DisposableInterface { return new BinaryDisposable( - $observable->subscribe($observer, $scheduler), + $observable->subscribe($observer), new CallbackDisposable($this->callback) ); } diff --git a/test/Rx/Functional/Operator/FinallyCallTest.php b/test/Rx/Functional/Operator/FinallyTest.php similarity index 90% rename from test/Rx/Functional/Operator/FinallyCallTest.php rename to test/Rx/Functional/Operator/FinallyTest.php index 96e4d546..627169a2 100644 --- a/test/Rx/Functional/Operator/FinallyCallTest.php +++ b/test/Rx/Functional/Operator/FinallyTest.php @@ -18,7 +18,7 @@ public function should_call_finally_after_complete() $completed = false; Observable::fromArray([1, 2, 3]) - ->finallyCall(function() use (&$completed) { + ->finally(function() use (&$completed) { $completed = true; }) ->subscribe(new CallbackObserver()); @@ -40,7 +40,7 @@ public function should_call_finally_after_error() } return $value; }) - ->finallyCall(function() use (&$thrown) { + ->finally(function() use (&$thrown) { $thrown = true; }) ->subscribe(new CallbackObserver(null, function() {})); // Ignore the default error handler @@ -58,7 +58,7 @@ public function should_call_finally_upon_disposal() Observable::create(function(ObserverInterface $obs) { $obs->onNext(1); }) - ->finallyCall(function() use (&$disposed) { + ->finally(function() use (&$disposed) { $disposed = true; }) ->subscribe(new CallbackObserver()) @@ -77,7 +77,7 @@ public function should_call_finally_when_synchronously_subscribing_to_and_unsubs Observable::create(function(ObserverInterface $obs) { $obs->onNext(1); }) - ->finallyCall(function() use (&$disposed) { + ->finally(function() use (&$disposed) { $disposed = true; }) ->share() @@ -95,10 +95,10 @@ public function should_call_two_finally_instances_in_succession_on_a_shared_obse $invoked = 0; Observable::fromArray([1, 2, 3]) - ->finallyCall(function() use (&$invoked) { + ->finally(function() use (&$invoked) { $invoked++; }) - ->finallyCall(function() use (&$invoked) { + ->finally(function() use (&$invoked) { $invoked++; }) ->share() @@ -119,7 +119,7 @@ public function should_handle_empty() ]); $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { - return $xs->finallyCall(function() use (&$executed) { + return $xs->finally(function() use (&$executed) { $executed = true; }); }); @@ -145,7 +145,7 @@ public function should_handle_never() $xs = $this->createHotObservable([ ]); $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { - return $xs->finallyCall(function() use (&$executed) { + return $xs->finally(function() use (&$executed) { $executed = true; }); }); @@ -173,7 +173,7 @@ public function should_handle_throw() ]); $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { - return $xs->finallyCall(function() use (&$executed) { + return $xs->finally(function() use (&$executed) { $executed = true; }); }); @@ -204,7 +204,7 @@ public function should_handle_basic_hot_observable() ]); $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { - return $xs->finallyCall(function() use (&$executed) { + return $xs->finally(function() use (&$executed) { $executed = true; }); }); @@ -238,7 +238,7 @@ public function should_handle_basic_cold_observable() ]); $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { - return $xs->finallyCall(function() use (&$executed) { + return $xs->finally(function() use (&$executed) { $executed = true; }); }); @@ -274,7 +274,7 @@ public function should_handle_basic_error() ]); $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { - return $xs->finallyCall(function() use (&$executed) { + return $xs->finally(function() use (&$executed) { $executed = true; }); }); @@ -307,7 +307,7 @@ public function should_handle_unsubscription() ]); $results = $this->scheduler->startWithDispose(function() use ($xs, &$executed) { - return $xs->finallyCall(function() use (&$executed) { + return $xs->finally(function() use (&$executed) { $executed = true; }); }, 450);