Skip to content

Commit

Permalink
Added singleInstance operator (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwdan authored and mbonneau committed Sep 5, 2017
1 parent c915843 commit cc6421c
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 1 deletion.
24 changes: 24 additions & 0 deletions demo/share/singleInstance.php
@@ -0,0 +1,24 @@
<?php

require_once __DIR__ . '/../bootstrap.php';

$interval = Rx\Observable::interval(1000);

$source = $interval
->take(2)
->do(function () {
echo 'Side effect', PHP_EOL;
});

$single = $source->singleInstance();

// two simultaneous subscriptions, lasting 2 seconds
$single->subscribe($createStdoutObserver('SourceA '));
$single->subscribe($createStdoutObserver('SourceB '));

\Rx\Observable::timer(5000)->subscribe(function () use ($single, &$createStdoutObserver) {
// resubscribe two times again, more than 5 seconds later,
// long after the original two subscriptions have ended
$single->subscribe($createStdoutObserver('SourceC '));
$single->subscribe($createStdoutObserver('SourceD '));
});
16 changes: 16 additions & 0 deletions demo/share/singleInstance.php.expect
@@ -0,0 +1,16 @@
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
SourceA Complete!
SourceB Complete!
Side effect
SourceC Next value: 0
SourceD Next value: 0
Side effect
SourceC Next value: 1
SourceD Next value: 1
SourceC Complete!
SourceD Complete!
38 changes: 37 additions & 1 deletion src/Observable.php
Expand Up @@ -11,7 +11,6 @@
use Rx\Observable\ConnectableObservable;
use Rx\Observable\EmptyObservable;
use Rx\Observable\ErrorObservable;
use Rx\Observable\FromPromiseObservable;
use Rx\Observable\ForkJoinObservable;
use Rx\Observable\IntervalObservable;
use Rx\Observable\IteratorObservable;
Expand Down Expand Up @@ -1312,6 +1311,43 @@ public function share(): RefCountObservable
return $this->publish()->refCount();
}

/**
* Returns an observable sequence that shares a single subscription to the underlying sequence. This observable sequence
* can be resubscribed to, even if all prior subscriptions have ended.
*
* This operator behaves like share() in RxJS 5
*
* @return \Rx\Observable An observable sequence that contains the elements of a sequence
* produced by multicasting the source sequence.
*
* @demo share/singleInstance.php
* @operator
* @reactivex refcount
*/
public function singleInstance(): Observable
{
$hasObservable = false;
$observable = null;
$source = $this;

$getObservable = function () use (&$hasObservable, &$observable, $source): Observable {
if (!$hasObservable) {
$hasObservable = true;
$observable = $source
->finally(function () use (&$hasObservable) {
$hasObservable = false;
})
->publish()
->refCount();
}
return $observable;
};

return new Observable\AnonymousObservable(function (ObserverInterface $o) use ($getObservable) {
return $getObservable()->subscribe($o);
});
}

/**
* Returns an observable sequence that shares a single subscription to the underlying sequence and starts with an
* initialValue.
Expand Down
120 changes: 120 additions & 0 deletions test/Rx/Functional/Operator/SingleInstanceTest.php
@@ -0,0 +1,120 @@
<?php

declare(strict_types=1);

namespace Rx\Functional\Operator;

use Rx\Disposable\CompositeDisposable;
use Rx\Disposable\SerialDisposable;
use Rx\Functional\FunctionalTestCase;

class SingleInstanceTest extends FunctionalTestCase
{
/**
* @test
*/
public function singleInstance_basic()
{
$xs = $this->createColdObservable([
onNext(100, 1),
onNext(150, 2),
onNext(200, 3),
onCompleted(250)
]);

$ys = null;
$results1 = $this->scheduler->createObserver();
$results2 = $this->scheduler->createObserver();
$disposable = null;

$this->scheduler->scheduleAbsolute($this->scheduler::CREATED, function () use (&$ys, $xs) {
$ys = $xs->singleInstance();
});

$this->scheduler->scheduleAbsolute($this->scheduler::SUBSCRIBED, function () use (&$ys, &$disposable, $results1, $results2) {
$disposable = new CompositeDisposable([
$ys->subscribe($results1),
$ys->subscribe($results2)
]);
});

$this->scheduler->scheduleAbsolute($this->scheduler::DISPOSED, function () use (&$disposable) {
$disposable->dispose();
});

$this->scheduler->start();

$this->assertMessages([
onNext(300, 1),
onNext(350, 2),
onNext(400, 3),
onCompleted(450)
], $results1->getMessages());

$this->assertMessages([
onNext(300, 1),
onNext(350, 2),
onNext(400, 3),
onCompleted(450)
], $results2->getMessages());

$this->assertSubscriptions([
subscribe(200, 450)
], $xs->getSubscriptions());
}

/**
* @test
*/
public function singleInstance_subscribe_after_stopped()
{
$xs = $this->createColdObservable([
onNext(100, 1),
onNext(150, 2),
onNext(200, 3),
onCompleted(250)
]);

$ys = null;
$results1 = $this->scheduler->createObserver();
$results2 = $this->scheduler->createObserver();
$disposable = new SerialDisposable();

$this->scheduler->scheduleAbsolute(100, function () use (&$ys, $xs) {
$ys = $xs->singleInstance();
});

$this->scheduler->scheduleAbsolute(200, function () use (&$ys, $disposable, $results1) {
$disposable->setDisposable($ys->subscribe($results1));
});

$this->scheduler->scheduleAbsolute(600, function () use (&$ys, $disposable, $results2) {
$disposable->setDisposable($ys->subscribe($results2));
});

$this->scheduler->scheduleAbsolute(900, function () use (&$disposable) {
$disposable->dispose();
});

$this->scheduler->start();

$this->assertMessages([
onNext(300, 1),
onNext(350, 2),
onNext(400, 3),
onCompleted(450)
], $results1->getMessages());

$this->assertMessages([
onNext(700, 1),
onNext(750, 2),
onNext(800, 3),
onCompleted(850)
], $results2->getMessages());

$this->assertSubscriptions([
subscribe(200, 450),
subscribe(600, 850)
], $xs->getSubscriptions());
}
}

0 comments on commit cc6421c

Please sign in to comment.