Skip to content

Commit

Permalink
Merge 5da769a into 4807ab1
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwdan committed Sep 11, 2016
2 parents 4807ab1 + 5da769a commit 7d35fcb
Show file tree
Hide file tree
Showing 5 changed files with 523 additions and 0 deletions.
18 changes: 18 additions & 0 deletions demo/switch/switchFirst.php
@@ -0,0 +1,18 @@
<?php

require_once __DIR__ . '/../bootstrap.php';

$loop = \React\EventLoop\Factory::create();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

$source = Rx\Observable::fromArray([
\Rx\Observable::interval(100)->mapTo('a'),
\Rx\Observable::interval(200)->mapTo('b'),
\Rx\Observable::interval(300)->mapTo('c'),
])
->switchFirst()
->take(3);

$subscription = $source->subscribe($stdoutObserver, $scheduler);

$loop->run();
4 changes: 4 additions & 0 deletions demo/switch/switchFirst.php.expect
@@ -0,0 +1,4 @@
Next value: a
Next value: a
Next value: a
Complete!
24 changes: 24 additions & 0 deletions lib/Rx/Observable.php
Expand Up @@ -51,6 +51,7 @@
use Rx\Operator\StartWithArrayOperator;
use Rx\Operator\SkipWhileOperator;
use Rx\Operator\SubscribeOnOperator;
use Rx\Operator\SwitchFirstOperator;
use Rx\Operator\SwitchLatestOperator;
use Rx\Operator\TakeLastOperator;
use Rx\Operator\TakeOperator;
Expand Down Expand Up @@ -1635,6 +1636,29 @@ public function switchLatest(SchedulerInterface $scheduler = null)
return new SwitchLatestOperator($scheduler);
});
}


/**
* Receives an Observable of Observables and propagates the first Observable exclusively until it completes before
* it begins subscribes to the next Observable. Observables that come before the current Observable completes will
* be dropped and will not propagate.
*
* This operator is similar to concatAll() except that it will not hold onto Observables that come in before the
* current one is finished completed.
*
* @return AnonymousObservable - An Observable sequence that is the result of concatenating non-overlapping items
* emitted by an Observable of Observables.
*
* @demo switch/switchFirst.php
* @operator
* @reactivex switch
*/
public function switchFirst()
{
return $this->lift(function () {
return new SwitchFirstOperator();
});
}

/**
* Returns two observables which partition the observations of the source by the given function.
Expand Down
65 changes: 65 additions & 0 deletions lib/Rx/Operator/SwitchFirstOperator.php
@@ -0,0 +1,65 @@
<?php

namespace Rx\Operator;

use Rx\Disposable\CompositeDisposable;
use Rx\Disposable\SingleAssignmentDisposable;
use Rx\Observable;
use Rx\ObservableInterface;
use Rx\Observer\CallbackObserver;
use Rx\ObserverInterface;
use Rx\Operator\OperatorInterface;
use Rx\SchedulerInterface;

final class SwitchFirstOperator implements OperatorInterface
{
private $isStopped = false;
private $hasCurrent = false;

public function __invoke(ObservableInterface $observable, ObserverInterface $observer, SchedulerInterface $scheduler = null)
{
$singleDisposable = new SingleAssignmentDisposable();
$disposable = new CompositeDisposable();

$disposable->add($singleDisposable);

$callbackObserver = new CallbackObserver(
function (Observable $x) use ($disposable, $scheduler, $observer) {
if ($this->hasCurrent) {
return;
}
$this->hasCurrent = true;

$inner = new SingleAssignmentDisposable();
$disposable->add($inner);

$innerSub = $x->subscribe(new CallbackObserver(
[$observer, "onNext"],
[$observer, "onError"],
function () use ($disposable, $inner, $observer) {
$disposable->remove($inner);
$this->hasCurrent = false;

if ($this->isStopped && $disposable->count() === 1) {
$observer->onCompleted();
}
}
), $scheduler);

$inner->setDisposable($innerSub);
},
[$observer, 'onError'],
function () use ($disposable, $observer) {
$this->isStopped = true;
if (!$this->hasCurrent && $disposable->count() === 1) {
$observer->onCompleted();
}
}
);

$singleDisposable->setDisposable($observable->subscribe($callbackObserver, $scheduler));

return $disposable;

}
}

0 comments on commit 7d35fcb

Please sign in to comment.