Skip to content

Commit

Permalink
Merge branch 'v1.x'
Browse files Browse the repository at this point in the history
Conflicts:
	src/Observable/functions.php
	tests/Observable/MergeTest.php
  • Loading branch information
trowski committed Dec 2, 2015
2 parents 29a7882 + af590f4 commit 5db3295
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 49 deletions.
49 changes: 18 additions & 31 deletions src/Observable/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,42 +49,29 @@ function merge(array $observables): Observable
/** @var \Icicle\Observable\Observable[] $observables */
$observables = array_map(__NAMESPACE__ . '\from', $observables);

/** @var \Icicle\Observable\ObservableIterator[] $iterators */
$iterators = array_map(function (Observable $observable) {
return $observable->getIterator();
}, $observables);
$callback = function ($value) use (&$emitting, $emit) {
while (null !== $emitting) {
yield $emitting; // Prevents simultaneous calls to $emit.
}

$emitting = new Delayed();

yield $emit($value);

$emitting->resolve();
$emitting = null;
};

/** @var \Icicle\Coroutine\Coroutine[] $coroutines */
$coroutines = array_map(function (ObservableIterator $iterator) {
return new Coroutine($iterator->wait());
}, $iterators);
$coroutines = array_map(function (Observable $observable) use ($callback) {
return new Coroutine($observable->each($callback));
}, $observables);

try {
while (count($coroutines)) {
yield Awaitable\choose($coroutines);

/**
* @var int $key
* @var \Icicle\Coroutine\Coroutine $coroutine
*/
foreach (array_filter($coroutines, function (Coroutine $coroutine) {
return !$coroutine->isPending();
}) as $key => $coroutine) {
$iterator = $iterators[$key];

if (!(yield $coroutine)) {
unset($coroutines[$key], $iterators[$key], $observables[$key]);
continue;
}

yield from $emit($iterator->getCurrent());

$coroutines[$key] = new Coroutine($iterator->wait());
}
}
return yield Awaitable\all($coroutines);
} catch (\Exception $exception) {
foreach ($observables as $observable) {
$observable->dispose($exception);
foreach ($coroutines as $coroutine) {
$coroutine->cancel($exception);
}
throw $exception;
}
Expand Down
29 changes: 11 additions & 18 deletions tests/Observable/MergeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,18 @@ protected function createEmitter($low, $high)
return new Observable\Emitter(function (callable $emit) use ($low, $high) {
foreach (range($low, $high) as $value) {
yield from $emit($value);
return $high - $low + 1;
}
});
}

public function getObservables()
{
return [
[[range(1, 3), range(4, 6)], [1, 4, 2, 5, 3, 6]],
[[$this->createEmitter(1, 3), range(4, 6)], [1, 4, 2, 5, 3, 6]],
[[range(1, 5), $this->createEmitter(6, 8)], [1, 6, 2, 7, 3, 8, 4, 5]],
[[new \ArrayObject(range(1, 4)), new \ArrayIterator(range(5, 8))], [1, 5, 2, 6, 3, 7, 4, 8]],
[[range(1, 3), range(4, 6)], [1, 4, 2, 5, 3, 6], [null, null]],
[[$this->createEmitter(1, 3), range(4, 6)], [1, 4, 2, 5, 3, 6], [3, null]],
[[$this->createEmitter(1, 5), $this->createEmitter(6, 8)], [1, 6, 2, 7, 3, 8, 4, 5], [5, 3]],
[[new \ArrayObject(range(1, 4)), new \ArrayIterator(range(5, 8))], [1, 5, 2, 6, 3, 7, 4, 8], [null, null]],
];
}

Expand All @@ -54,7 +55,7 @@ public function getObservables()
* @param array $observables
* @param array $expected
*/
public function testMerge(array $observables, array $expected)
public function testMerge(array $observables, array $expected, array $result)
{
$observable = Observable\merge($observables);

Expand All @@ -63,7 +64,7 @@ public function testMerge(array $observables, array $expected)
$this->assertSame($expected[$i++], $value);
}));

$awaitable->wait();
$this->assertEquals($result, $awaitable->wait());
}

/**
Expand All @@ -79,20 +80,12 @@ public function testMergeWithFailedObservable()
throw $exception;
});

$iterator = $this->getMock(Observable\ObservableIterator::class);
$iterator->expects($this->once())
->method('wait')
->will($this->returnCallback(function () {
return yield new Delayed();
}));

$observable = $this->getMock(Observable\Observable::class);
$observable->expects($this->once())
->method('getIterator')
->will($this->returnValue($iterator));
$observable->expects($this->once())
->method('dispose')
->with($this->identicalTo($exception));
->method('each')
->will($this->returnCallback(function (callable $callback) {
yield new Delayed();
}));

$observable = Observable\merge([$emitter, $observable]);

Expand Down

0 comments on commit 5db3295

Please sign in to comment.