Skip to content

Commit

Permalink
Make do have the same call signature as subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwdan committed Jan 7, 2017
1 parent 730e8f1 commit 746c372
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 211 deletions.
4 changes: 2 additions & 2 deletions demo/do/do.php
Expand Up @@ -3,7 +3,7 @@
require_once __DIR__ . '/../bootstrap.php';

$source = \Rx\Observable::range(0, 3)
->do(new \Rx\Observer\CallbackObserver(
->do(
function ($x) {
echo 'Do Next:', $x, PHP_EOL;
},
Expand All @@ -13,6 +13,6 @@ function (Throwable $err) {
function () {
echo 'Do Completed', PHP_EOL;
}
));
);

$subscription = $source->subscribe($stdoutObserver);
23 changes: 16 additions & 7 deletions src/Observable.php
Expand Up @@ -899,25 +899,34 @@ public function distinctUntilKeyChanged(callable $keySelector = null, callable $
* This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to
* run arbitrary actions for messages on the pipeline.
*
* When using doOnEach, it is important to note that the Observer may receive additional
* events after a stream has completed or errored (such as when useing a repeat or resubscribing).
* When using do, it is important to note that the Observer may receive additional
* events after a stream has completed or errored (such as when using a repeat or resubscribing).
* If you are using an Observable that extends the AbstractObservable, you will not receive these
* events. For this special case, use the DoObserver.
*
* doOnNext, doOnError, and doOnCompleted uses the DoObserver internally and will receive these
* additional events.
*
* @param ObserverInterface $observer
*
* @return \Rx\Observable\AnonymousObservable
* @param callable|ObserverInterface $onNextOrObserver
* @param callable $onError
* @param callable $onCompleted
* @return AnonymousObservable
* @throws \InvalidArgumentException
*
* @demo do/do.php
* @operator
* @reactivex do
*
*/
public function do(ObserverInterface $observer): AnonymousObservable
public function do($onNextOrObserver = null, callable $onError = null, callable $onCompleted = null): AnonymousObservable
{
if ($onNextOrObserver instanceof ObserverInterface) {
$observer = $onNextOrObserver;
} elseif (is_callable($onNextOrObserver)) {
$observer = new DoObserver($onNextOrObserver, $onError, $onCompleted);
} else {
throw new \InvalidArgumentException('The first argument needs to be a "callable" or "Observer"');
}

return $this->lift(function () use ($observer) {
return new DoOnEachOperator($observer);
});
Expand Down

0 comments on commit 746c372

Please sign in to comment.