diff --git a/lib/Rx/Operator/DelayOperator.php b/lib/Rx/Operator/DelayOperator.php index 35ed780d..f94784f4 100644 --- a/lib/Rx/Operator/DelayOperator.php +++ b/lib/Rx/Operator/DelayOperator.php @@ -3,6 +3,7 @@ namespace Rx\Operator; use Rx\Disposable\CompositeDisposable; +use Rx\Disposable\EmptyDisposable; use Rx\ObservableInterface; use Rx\Observer\CallbackObserver; use Rx\ObserverInterface; @@ -45,6 +46,8 @@ public function __invoke( $disposable = new CompositeDisposable(); + $sourceDisposable = new EmptyDisposable(); + $sourceDisposable = $observable->subscribe(new CallbackObserver( function ($x) use ($scheduler, $observer, &$lastScheduledTime, $disposable) { $schedDisp = $scheduler->schedule(function () use ($x, $observer, &$schedDisp, $disposable) { @@ -68,8 +71,9 @@ function () use ($scheduler, $observer, $disposable, &$sourceDisposable) { }, $this->delay); $disposable->add($schedDisp); - } - )); + }), + $scheduler + ); $disposable->add($sourceDisposable); diff --git a/test/Rx/Functional/Operator/DelayTest.php b/test/Rx/Functional/Operator/DelayTest.php index 89da8450..5a19d253 100644 --- a/test/Rx/Functional/Operator/DelayTest.php +++ b/test/Rx/Functional/Operator/DelayTest.php @@ -3,6 +3,7 @@ namespace Rx\Functional\Operator; use Rx\Functional\FunctionalTestCase; +use Rx\Observable; class DelayTest extends FunctionalTestCase { @@ -270,4 +271,44 @@ public function delay_never() $xs->getSubscriptions() ); } + + /** + * @test + */ + public function delay_scheduler_is_passed_to_source() + { + $schedulerPassedIn = null; + + Observable::create( + function ($observer, $scheduler) use (&$schedulerPassedIn) { + $schedulerPassedIn = $scheduler; + $observer->onCompleted(); + }) + ->delay(1, $this->scheduler) + ->subscribeCallback(); + + $this->assertSame($schedulerPassedIn, $this->scheduler); + } + + /** + * @test + */ + public function delay_completes_during_subscribe_without_throwing() + { + $completes = false; + + Observable::create(function ($observer, $scheduler) { + $observer->onCompleted(); + })->delay(1, $this->scheduler)->subscribeCallback( + null, + null, + function () use (&$completes) { + $completes = true; + } + ); + + $this->scheduler->start(); + + $this->assertTrue($completes); + } } \ No newline at end of file