diff --git a/demo/do/doFinally-error.php b/demo/do/doFinally-error.php new file mode 100644 index 00000000..1ec514ac --- /dev/null +++ b/demo/do/doFinally-error.php @@ -0,0 +1,15 @@ +map(function($value) { + if ($value == 2) { + throw new \Exception('error'); + } + return $value; + }) + ->doFinally(function() { + echo "Finally\n"; + }) + ->subscribe($stdoutObserver); diff --git a/demo/do/doFinally-error.php.expect b/demo/do/doFinally-error.php.expect new file mode 100644 index 00000000..5c2abf4e --- /dev/null +++ b/demo/do/doFinally-error.php.expect @@ -0,0 +1,3 @@ +Next value: 1 +Exception: error +Finally diff --git a/demo/do/doFinally.php b/demo/do/doFinally.php new file mode 100644 index 00000000..b73ccc90 --- /dev/null +++ b/demo/do/doFinally.php @@ -0,0 +1,9 @@ +doFinally(function() { + echo "Finally\n"; + }) + ->subscribe($stdoutObserver); diff --git a/demo/do/doFinally.php.expect b/demo/do/doFinally.php.expect new file mode 100644 index 00000000..26c084f1 --- /dev/null +++ b/demo/do/doFinally.php.expect @@ -0,0 +1,5 @@ +Next value: 1 +Next value: 2 +Next value: 3 +Complete! +Finally diff --git a/lib/Rx/Observable.php b/lib/Rx/Observable.php index e7275529..68d7575d 100644 --- a/lib/Rx/Observable.php +++ b/lib/Rx/Observable.php @@ -32,6 +32,7 @@ use Rx\Operator\DistinctOperator; use Rx\Operator\DistinctUntilChangedOperator; use Rx\Operator\DoOnEachOperator; +use Rx\Operator\DoFinallyOperator; use Rx\Operator\GroupByUntilOperator; use Rx\Operator\IsEmptyOperator; use Rx\Operator\MapOperator; @@ -1867,4 +1868,23 @@ public function isEmpty() return new IsEmptyOperator(); }); } + + + /** + * Will call a specified function when the source terminates on complete or error. + * + * @param callable $callback + * @return AnonymousObservable + * + * @demo do/doFinally.php + * @demo do/doFinally-error.php + * @operator + * @reactivex do + */ + public function doFinally(callable $callback) + { + return $this->lift(function() use ($callback) { + return new DoFinallyOperator($callback); + }); + } } diff --git a/lib/Rx/Operator/DoFinallyOperator.php b/lib/Rx/Operator/DoFinallyOperator.php new file mode 100644 index 00000000..797ae588 --- /dev/null +++ b/lib/Rx/Operator/DoFinallyOperator.php @@ -0,0 +1,37 @@ +callback = $callback; + } + + /** + * @param \Rx\ObservableInterface $observable + * @param \Rx\ObserverInterface $observer + * @param \Rx\SchedulerInterface $scheduler + * @return \Rx\DisposableInterface + */ + public function __invoke(ObservableInterface $observable, ObserverInterface $observer, SchedulerInterface $scheduler = null) + { + return new BinaryDisposable( + $observable->subscribe($observer, $scheduler), + new CallbackDisposable($this->callback) + ); + } +} \ No newline at end of file diff --git a/test/Rx/Functional/Operator/DoFinallyTest.php b/test/Rx/Functional/Operator/DoFinallyTest.php new file mode 100644 index 00000000..c64ec552 --- /dev/null +++ b/test/Rx/Functional/Operator/DoFinallyTest.php @@ -0,0 +1,327 @@ +doFinally(function() use (&$completed) { + $completed = true; + }) + ->subscribe(new CallbackObserver()); + + $this->assertTrue($completed); + } + + /** + * @test + */ + public function should_call_finally_after_error() + { + $thrown = false; + + Observable::fromArray([1, 2, 3]) + ->map(function($value) { + if ($value == 3) { + throw new \Exception(); + } + return $value; + }) + ->doFinally(function() use (&$thrown) { + $thrown = true; + }) + ->subscribe(new CallbackObserver(null, function() {})); // Ignore the default error handler + + $this->assertTrue($thrown); + } + + /** + * @test + */ + public function should_call_finally_upon_disposal() + { + $disposed = false; + + Observable::create(function(ObserverInterface $obs) { + $obs->onNext(1); + }) + ->doFinally(function() use (&$disposed) { + $disposed = true; + }) + ->subscribe(new CallbackObserver()) + ->dispose(); + + $this->assertTrue($disposed); + } + + /** + * @test + */ + public function should_call_finally_when_synchronously_subscribing_to_and_unsubscribing_from_a_shared_observable() + { + $disposed = false; + + Observable::create(function(ObserverInterface $obs) { + $obs->onNext(1); + }) + ->doFinally(function() use (&$disposed) { + $disposed = true; + }) + ->share() + ->subscribe(new CallbackObserver()) + ->dispose(); + + $this->assertTrue($disposed); + } + + /** + * @test + */ + public function should_call_two_finally_instances_in_succession_on_a_shared_observable() + { + $invoked = 0; + + Observable::fromArray([1, 2, 3]) + ->doFinally(function() use (&$invoked) { + $invoked++; + }) + ->doFinally(function() use (&$invoked) { + $invoked++; + }) + ->share() + ->subscribe(new CallbackObserver()); + + $this->assertEquals(2, $invoked); + } + + /** + * @test + */ + public function should_handle_empty() + { + $executed = false; + + $xs = $this->createHotObservable([ + onCompleted(300) + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->doFinally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onCompleted(300), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 300), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_never() + { + $executed = false; + + $xs = $this->createHotObservable([ ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->doFinally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 1000), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_throw() + { + $executed = false; + + $e = new \Exception(); + + $xs = $this->createHotObservable([ + onError(300, $e) + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->doFinally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onError(300, $e) + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 300), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_basic_hot_observable() + { + $executed = false; + + $xs = $this->createHotObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onCompleted(600), + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->doFinally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onCompleted(600), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 600), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_basic_cold_observable() + { + $executed = false; + + $xs = $this->createColdObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onCompleted(600), + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->doFinally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(500, 'a'), + onNext(600, 'b'), + onNext(700, 'c'), + onCompleted(800), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 800), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_basic_error() + { + $executed = false; + + $e = new \Exception(); + + $xs = $this->createHotObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onError(600, $e), + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->doFinally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onError(600, $e), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 600), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_unsubscription() + { + $executed = false; + + $xs = $this->createHotObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + ]); + + $results = $this->scheduler->startWithDispose(function() use ($xs, &$executed) { + return $xs->doFinally(function() use (&$executed) { + $executed = true; + }); + }, 450); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(300, 'a'), + onNext(400, 'b'), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 450), + ], $xs->getSubscriptions()); + } + +} \ No newline at end of file