Skip to content

Commit

Permalink
Fix delay. Pass scheduler through, initialize disposable in case onCo…
Browse files Browse the repository at this point in the history
…mpleted is called prior to subscribe completing. With tests.
  • Loading branch information
mbonneau committed Feb 7, 2016
1 parent 10781d9 commit 2613f36
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
8 changes: 6 additions & 2 deletions lib/Rx/Operator/DelayOperator.php
Expand Up @@ -3,6 +3,7 @@
namespace Rx\Operator;

use Rx\Disposable\CompositeDisposable;
use Rx\Disposable\EmptyDisposable;
use Rx\ObservableInterface;
use Rx\Observer\CallbackObserver;
use Rx\ObserverInterface;
Expand Down Expand Up @@ -45,6 +46,8 @@ public function __invoke(

$disposable = new CompositeDisposable();

$sourceDisposable = new EmptyDisposable();

$sourceDisposable = $observable->subscribe(new CallbackObserver(
function ($x) use ($scheduler, $observer, &$lastScheduledTime, $disposable) {
$schedDisp = $scheduler->schedule(function () use ($x, $observer, &$schedDisp, $disposable) {
Expand All @@ -68,8 +71,9 @@ function () use ($scheduler, $observer, $disposable, &$sourceDisposable) {
}, $this->delay);

$disposable->add($schedDisp);
}
));
}),
$scheduler
);

$disposable->add($sourceDisposable);

Expand Down
41 changes: 41 additions & 0 deletions test/Rx/Functional/Operator/DelayTest.php
Expand Up @@ -3,6 +3,7 @@
namespace Rx\Functional\Operator;

use Rx\Functional\FunctionalTestCase;
use Rx\Observable;

class DelayTest extends FunctionalTestCase
{
Expand Down Expand Up @@ -270,4 +271,44 @@ public function delay_never()
$xs->getSubscriptions()
);
}

/**
* @test
*/
public function delay_scheduler_is_passed_to_source()
{
$schedulerPassedIn = null;

Observable::create(
function ($observer, $scheduler) use (&$schedulerPassedIn) {
$schedulerPassedIn = $scheduler;
$observer->onCompleted();
})
->delay(1, $this->scheduler)
->subscribeCallback();

$this->assertSame($schedulerPassedIn, $this->scheduler);
}

/**
* @test
*/
public function delay_completes_during_subscribe_without_throwing()
{
$completes = false;

Observable::create(function ($observer, $scheduler) {
$observer->onCompleted();
})->delay(1, $this->scheduler)->subscribeCallback(
null,
null,
function () use (&$completes) {
$completes = true;
}
);

$this->scheduler->start();

$this->assertTrue($completes);
}
}

0 comments on commit 2613f36

Please sign in to comment.