diff --git a/demo/finally/finally-error.php b/demo/finally/finally-error.php new file mode 100644 index 00000000..f1567ae0 --- /dev/null +++ b/demo/finally/finally-error.php @@ -0,0 +1,15 @@ +map(function($value) { + if ($value == 2) { + throw new \Exception('error'); + } + return $value; + }) + ->finally(function() { + echo "Finally\n"; + }) + ->subscribe($stdoutObserver); diff --git a/demo/finally/finally-error.php.expect b/demo/finally/finally-error.php.expect new file mode 100644 index 00000000..5c2abf4e --- /dev/null +++ b/demo/finally/finally-error.php.expect @@ -0,0 +1,3 @@ +Next value: 1 +Exception: error +Finally diff --git a/demo/finally/finally.php b/demo/finally/finally.php new file mode 100644 index 00000000..c41ce58d --- /dev/null +++ b/demo/finally/finally.php @@ -0,0 +1,9 @@ +finally(function() { + echo "Finally\n"; + }) + ->subscribe($stdoutObserver); diff --git a/demo/finally/finally.php.expect b/demo/finally/finally.php.expect new file mode 100644 index 00000000..26c084f1 --- /dev/null +++ b/demo/finally/finally.php.expect @@ -0,0 +1,5 @@ +Next value: 1 +Next value: 2 +Next value: 3 +Complete! +Finally diff --git a/src/Observable.php b/src/Observable.php index f78eed12..9892a59f 100644 --- a/src/Observable.php +++ b/src/Observable.php @@ -35,6 +35,7 @@ use Rx\Operator\DistinctOperator; use Rx\Operator\DistinctUntilChangedOperator; use Rx\Operator\DoOnEachOperator; +use Rx\Operator\FinallyOperator; use Rx\Operator\GroupByUntilOperator; use Rx\Operator\IsEmptyOperator; use Rx\Operator\MapOperator; @@ -1940,6 +1941,24 @@ public function isEmpty(): Observable }); } + /** + * Will call a specified function when the source terminates on complete or error. + * + * @param callable $callback + * @return Observable + * + * @demo finally/finally.php + * @demo finally/finally-error.php + * @operator + * @reactivex do + */ + public function finally(callable $callback): Observable + { + return $this->lift(function() use ($callback) { + return new FinallyOperator($callback); + }); + } + /** * @param Promise $promise * @param SchedulerInterface|null $scheduler diff --git a/src/Operator/FinallyOperator.php b/src/Operator/FinallyOperator.php new file mode 100644 index 00000000..f2b53b9f --- /dev/null +++ b/src/Operator/FinallyOperator.php @@ -0,0 +1,36 @@ +callback = $callback; + } + + /** + * @param \Rx\ObservableInterface $observable + * @param \Rx\ObserverInterface $observer + * @return \Rx\DisposableInterface + */ + public function __invoke(ObservableInterface $observable, ObserverInterface $observer): DisposableInterface + { + return new BinaryDisposable( + $observable->subscribe($observer), + new CallbackDisposable($this->callback) + ); + } +} \ No newline at end of file diff --git a/test/Rx/Functional/Operator/FinallyTest.php b/test/Rx/Functional/Operator/FinallyTest.php new file mode 100644 index 00000000..627169a2 --- /dev/null +++ b/test/Rx/Functional/Operator/FinallyTest.php @@ -0,0 +1,327 @@ +finally(function() use (&$completed) { + $completed = true; + }) + ->subscribe(new CallbackObserver()); + + $this->assertTrue($completed); + } + + /** + * @test + */ + public function should_call_finally_after_error() + { + $thrown = false; + + Observable::fromArray([1, 2, 3]) + ->map(function($value) { + if ($value == 3) { + throw new \Exception(); + } + return $value; + }) + ->finally(function() use (&$thrown) { + $thrown = true; + }) + ->subscribe(new CallbackObserver(null, function() {})); // Ignore the default error handler + + $this->assertTrue($thrown); + } + + /** + * @test + */ + public function should_call_finally_upon_disposal() + { + $disposed = false; + + Observable::create(function(ObserverInterface $obs) { + $obs->onNext(1); + }) + ->finally(function() use (&$disposed) { + $disposed = true; + }) + ->subscribe(new CallbackObserver()) + ->dispose(); + + $this->assertTrue($disposed); + } + + /** + * @test + */ + public function should_call_finally_when_synchronously_subscribing_to_and_unsubscribing_from_a_shared_observable() + { + $disposed = false; + + Observable::create(function(ObserverInterface $obs) { + $obs->onNext(1); + }) + ->finally(function() use (&$disposed) { + $disposed = true; + }) + ->share() + ->subscribe(new CallbackObserver()) + ->dispose(); + + $this->assertTrue($disposed); + } + + /** + * @test + */ + public function should_call_two_finally_instances_in_succession_on_a_shared_observable() + { + $invoked = 0; + + Observable::fromArray([1, 2, 3]) + ->finally(function() use (&$invoked) { + $invoked++; + }) + ->finally(function() use (&$invoked) { + $invoked++; + }) + ->share() + ->subscribe(new CallbackObserver()); + + $this->assertEquals(2, $invoked); + } + + /** + * @test + */ + public function should_handle_empty() + { + $executed = false; + + $xs = $this->createHotObservable([ + onCompleted(300) + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onCompleted(300), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 300), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_never() + { + $executed = false; + + $xs = $this->createHotObservable([ ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 1000), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_throw() + { + $executed = false; + + $e = new \Exception(); + + $xs = $this->createHotObservable([ + onError(300, $e) + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onError(300, $e) + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 300), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_basic_hot_observable() + { + $executed = false; + + $xs = $this->createHotObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onCompleted(600), + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onCompleted(600), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 600), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_basic_cold_observable() + { + $executed = false; + + $xs = $this->createColdObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onCompleted(600), + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(500, 'a'), + onNext(600, 'b'), + onNext(700, 'c'), + onCompleted(800), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 800), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_basic_error() + { + $executed = false; + + $e = new \Exception(); + + $xs = $this->createHotObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onError(600, $e), + ]); + + $results = $this->scheduler->startWithCreate(function() use ($xs, &$executed) { + return $xs->finally(function() use (&$executed) { + $executed = true; + }); + }); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + onError(600, $e), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 600), + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function should_handle_unsubscription() + { + $executed = false; + + $xs = $this->createHotObservable([ + onNext(300, 'a'), + onNext(400, 'b'), + onNext(500, 'c'), + ]); + + $results = $this->scheduler->startWithDispose(function() use ($xs, &$executed) { + return $xs->finally(function() use (&$executed) { + $executed = true; + }); + }, 450); + + $this->assertTrue($executed); + + $this->assertMessages([ + onNext(300, 'a'), + onNext(400, 'b'), + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 450), + ], $xs->getSubscriptions()); + } + +} \ No newline at end of file