Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions demo/bootstrap.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
function asString($value) {
if (is_array($value)) {
return json_encode($value);
} elseif (is_bool($value)) {
return (string)(integer)$value;
}
return (string) $value;
}
Expand Down
8 changes: 8 additions & 0 deletions demo/isEmpty/isEmpty-false.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

require_once __DIR__ . '/../bootstrap.php';

$source = \Rx\Observable::just(1)
->isEmpty();

$source->subscribe($stdoutObserver);
2 changes: 2 additions & 0 deletions demo/isEmpty/isEmpty-false.php.expect
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Next value: 0
Complete!
8 changes: 8 additions & 0 deletions demo/isEmpty/isEmpty.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

require_once __DIR__ . '/../bootstrap.php';

$source = \Rx\Observable::emptyObservable()
->isEmpty();

$source->subscribe($stdoutObserver);
2 changes: 2 additions & 0 deletions demo/isEmpty/isEmpty.php.expect
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Next value: 1
Complete!
18 changes: 18 additions & 0 deletions src/Observable.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
use Rx\Operator\DistinctUntilChangedOperator;
use Rx\Operator\DoOnEachOperator;
use Rx\Operator\GroupByUntilOperator;
use Rx\Operator\IsEmptyOperator;
use Rx\Operator\MapOperator;
use Rx\Operator\FilterOperator;
use Rx\Operator\MinOperator;
Expand Down Expand Up @@ -1922,6 +1923,23 @@ public function throttle(int $throttleDuration, SchedulerInterface $scheduler =
});
}

/**
* If the source Observable is empty it returns an Observable that emits true, otherwise it emits false.
*
* @return Observable
*
* @demo isEmpty/isEmpty.php
* @demo isEmpty/isEmpty-false.php
* @operator
* @reactivex contains
*/
public function isEmpty(): Observable
{
return $this->lift(function () {
return new IsEmptyOperator();
});
}

/**
* @param Promise $promise
* @param SchedulerInterface|null $scheduler
Expand Down
27 changes: 27 additions & 0 deletions src/Operator/IsEmptyOperator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace Rx\Operator;

use Rx\DisposableInterface;
use Rx\ObservableInterface;
use Rx\ObserverInterface;

final class IsEmptyOperator implements OperatorInterface
{

public function __invoke(ObservableInterface $observable, ObserverInterface $observer): DisposableInterface
{
return $observable->subscribe(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the explicit CallbackObserver because it'll get wrapped with CallbackObserver anyway.

function() use ($observer) {
$observer->onNext(false);
$observer->onCompleted();
},
[$observer, 'onError'],
function() use ($observer) {
$observer->onNext(true);
$observer->onCompleted();
}
);
}

}
194 changes: 194 additions & 0 deletions test/Rx/Functional/Operator/IsEmptyTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
<?php

namespace Rx\Functional\Operator;

use Rx\Functional\FunctionalTestCase;

class IsEmptyTest extends FunctionalTestCase
{

/**
* @test
*/
public function should_return_true_if_source_is_empty()
{
$xs = $this->createHotObservable([
onCompleted(300)
]);

$results = $this->scheduler->startWithCreate(function() use ($xs) {
return $xs->isEmpty();
});

$this->assertMessages([
onNext(300, true),
onCompleted(300),
], $results->getMessages());

$this->assertSubscriptions([
subscribe(200, 300),
], $xs->getSubscriptions());
}

/**
* @test
*/
public function should_return_false_if_source_emits_element()
{
$xs = $this->createHotObservable([
onNext(150, 'a'),
onNext(300, 'b'),
onCompleted(300)
]);

$results = $this->scheduler->startWithCreate(function() use ($xs) {
return $xs->isEmpty();
});

$this->assertMessages([
onNext(300, false),
onCompleted(300),
], $results->getMessages());

$this->assertSubscriptions([
subscribe(200, 300),
], $xs->getSubscriptions());
}

/**
* @test
*/
public function should_return_true_if_source_emits_before_subscription()
{
$xs = $this->createHotObservable([
onNext(150, 'a'),
onCompleted(300)
]);

$results = $this->scheduler->startWithCreate(function() use ($xs) {
return $xs->isEmpty();
});

$this->assertMessages([
onNext(300, true),
onCompleted(300),
], $results->getMessages());

$this->assertSubscriptions([
subscribe(200, 300),
], $xs->getSubscriptions());
}

/**
* @test
*/
public function should_raise_error_if_source_raise_error()
{
$e = new \Exception();

$xs = $this->createHotObservable([
onError(300, $e),
]);

$results = $this->scheduler->startWithCreate(function() use ($xs) {
return $xs->isEmpty();
});

$this->assertMessages([
onError(300, $e),
], $results->getMessages());

$this->assertSubscriptions([
subscribe(200, 300),
], $xs->getSubscriptions());
}

/**
* @test
*/
public function should_not_complete_if_source_never_emits()
{
$xs = $this->createHotObservable([]);

$results = $this->scheduler->startWithCreate(function() use ($xs) {
return $xs->isEmpty();
});

$this->assertMessages([], $results->getMessages());
$this->assertSubscriptions([
subscribe(200, 1000),
], $xs->getSubscriptions());
}

/**
* @test
*/
public function should_return_true_if_source_completes_immediately()
{
$xs = $this->createHotObservable([
onCompleted(201),
]);

$results = $this->scheduler->startWithCreate(function() use ($xs) {
return $xs->isEmpty();
});

$this->assertMessages([
onNext(201, true),
onCompleted(201),
], $results->getMessages());
$this->assertSubscriptions([
subscribe(200, 201),
], $xs->getSubscriptions());
}

/**
* @test
*/
public function should_allow_unsubscribing_explicitly_and_early()
{
$xs = $this->createHotObservable([
onNext(600, 'a'),
onNext(700, 'b'),
]);

$results = $this->scheduler->startWithDispose(function() use ($xs) {
return $xs->isEmpty();
}, 500);

$this->assertMessages([], $results->getMessages());

$this->assertSubscriptions([
subscribe(200, 500),
], $xs->getSubscriptions());
}

/**
* @test
*/
public function should_not_break_unsubscription_chains_when_result_is_unsubscribed_explicitly()
{
$xs = $this->createHotObservable([
onNext(600, 'a'),
onNext(700, 'b'),
]);

$results = $this->scheduler->startWithDispose(function() use ($xs) {
return $xs
->flatMap(function($value) {
return \Rx\Observable::just($value);
})
->isEmpty()
->flatMap(function($value) {
return \Rx\Observable::just($value);
});
}, 500);

$this->assertMessages([], $results->getMessages());

$this->assertSubscriptions([
subscribe(200, 500),
], $xs->getSubscriptions());
}

}