From af590f410348a6fb7c272d35762db59f62be3d7a Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Tue, 1 Dec 2015 19:08:24 -0600 Subject: [PATCH] Better merge() implementation --- src/Observable/functions.php | 51 ++++++++++++---------------------- tests/Observable/MergeTest.php | 29 ++++++++----------- 2 files changed, 29 insertions(+), 51 deletions(-) diff --git a/src/Observable/functions.php b/src/Observable/functions.php index 2ffbaf2..9dbb846 100644 --- a/src/Observable/functions.php +++ b/src/Observable/functions.php @@ -51,47 +51,32 @@ function merge(array $observables) /** @var \Icicle\Observable\Observable[] $observables */ $observables = array_map(__NAMESPACE__ . '\from', $observables); - /** @var \Icicle\Observable\ObservableIterator[] $iterators */ - $iterators = array_map(function (Observable $observable) { - return $observable->getIterator(); - }, $observables); + $callback = function ($value) use (&$emitting, $emit) { + while (null !== $emitting) { + yield $emitting; // Prevents simultaneous calls to $emit. + } + + $emitting = new Delayed(); + + yield $emit($value); + + $emitting->resolve(); + $emitting = null; + }; /** @var \Icicle\Coroutine\Coroutine[] $coroutines */ - $coroutines = array_map(function (ObservableIterator $iterator) { - return new Coroutine($iterator->wait()); - }, $iterators); + $coroutines = array_map(function (Observable $observable) use ($callback) { + return new Coroutine($observable->each($callback)); + }, $observables); try { - while (count($coroutines)) { - yield Awaitable\choose($coroutines); - - /** - * @var int $key - * @var \Icicle\Coroutine\Coroutine $coroutine - */ - foreach (array_filter($coroutines, function (Coroutine $coroutine) { - return !$coroutine->isPending(); - }) as $key => $coroutine) { - $iterator = $iterators[$key]; - - if (!(yield $coroutine)) { - unset($coroutines[$key], $iterators[$key], $observables[$key]); - continue; - } - - yield $emit($iterator->getCurrent()); - - $coroutines[$key] = new Coroutine($iterator->wait()); - } - } + yield Awaitable\all($coroutines); } catch (\Exception $exception) { - foreach ($observables as $observable) { - $observable->dispose($exception); + foreach ($coroutines as $coroutine) { + $coroutine->cancel($exception); } throw $exception; } - - yield null; // Yield null so last emitted value is not the return value (not needed in PHP 7). }); } diff --git a/tests/Observable/MergeTest.php b/tests/Observable/MergeTest.php index d60443b..603b392 100644 --- a/tests/Observable/MergeTest.php +++ b/tests/Observable/MergeTest.php @@ -34,6 +34,7 @@ protected function createEmitter($low, $high) return new Observable\Emitter(function (callable $emit) use ($low, $high) { foreach (range($low, $high) as $value) { yield $emit($value); + yield $high - $low + 1; } }); } @@ -41,10 +42,10 @@ protected function createEmitter($low, $high) public function getObservables() { return [ - [[range(1, 3), range(4, 6)], [1, 4, 2, 5, 3, 6]], - [[$this->createEmitter(1, 3), range(4, 6)], [1, 4, 2, 5, 3, 6]], - [[range(1, 5), $this->createEmitter(6, 8)], [1, 6, 2, 7, 3, 8, 4, 5]], - [[new \ArrayObject(range(1, 4)), new \ArrayIterator(range(5, 8))], [1, 5, 2, 6, 3, 7, 4, 8]], + [[range(1, 3), range(4, 6)], [1, 4, 2, 5, 3, 6], [null, null]], + [[$this->createEmitter(1, 3), range(4, 6)], [1, 4, 2, 5, 3, 6], [3, null]], + [[$this->createEmitter(1, 5), $this->createEmitter(6, 8)], [1, 6, 2, 7, 3, 8, 4, 5], [5, 3]], + [[new \ArrayObject(range(1, 4)), new \ArrayIterator(range(5, 8))], [1, 5, 2, 6, 3, 7, 4, 8], [null, null]], ]; } @@ -54,7 +55,7 @@ public function getObservables() * @param array $observables * @param array $expected */ - public function testMerge(array $observables, array $expected) + public function testMerge(array $observables, array $expected, array $result) { $observable = Observable\merge($observables); @@ -63,7 +64,7 @@ public function testMerge(array $observables, array $expected) $this->assertSame($expected[$i++], $value); })); - $awaitable->wait(); + $this->assertEquals($result, $awaitable->wait()); } /** @@ -79,20 +80,12 @@ public function testMergeWithFailedObservable() throw $exception; }); - $iterator = $this->getMock(Observable\ObservableIterator::class); - $iterator->expects($this->once()) - ->method('wait') - ->will($this->returnCallback(function () { - yield new Delayed(); - })); - $observable = $this->getMock(Observable\Observable::class); $observable->expects($this->once()) - ->method('getIterator') - ->will($this->returnValue($iterator)); - $observable->expects($this->once()) - ->method('dispose') - ->with($this->identicalTo($exception)); + ->method('each') + ->will($this->returnCallback(function (callable $callback) { + yield new Delayed(); + })); $observable = Observable\merge([$emitter, $observable]);