Skip to content

Commit

Permalink
Merge b837abe into 9fb9197
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwdan committed Jan 6, 2017
2 parents 9fb9197 + b837abe commit a6fb7eb
Show file tree
Hide file tree
Showing 249 changed files with 2,241 additions and 2,848 deletions.
6 changes: 5 additions & 1 deletion .travis.yml
@@ -1,10 +1,14 @@
language: php

php:
- 5.6
- 7
- 7.1
- hhvm

matrix:
allow_failures:
- php: hhvm

before_script: composer install

script:
Expand Down
93 changes: 81 additions & 12 deletions README.md
@@ -1,26 +1,21 @@
RxPHP
======

## This is the development branch for RxPHP v2 and is not stable. For production, use [v1](https://github.com/reactivex/rxphp) instead.

Reactive extensions for PHP. The reactive extensions for PHP are a set of
libraries to compose asynchronous and event-based programs using observable
collections and LINQ-style query operators in PHP.

[![Build Status](https://secure.travis-ci.org/ReactiveX/RxPHP.png?branch=master)](https://travis-ci.org/ReactiveX/RxPHP)
[![Coverage Status](https://coveralls.io/repos/github/ReactiveX/RxPHP/badge.svg?branch=master)](https://coveralls.io/github/ReactiveX/RxPHP?branch=master)

## Installation
Install dependencies using [composer](https://getcomposer.org).

```bash
$ composer.phar require reactivex/rxphp
```

## Example

```php
$source = \Rx\Observable::fromArray([1, 2, 3, 4]);

$subscription = $source->subscribe(new \Rx\Observer\CallbackObserver(
$source->subscribe(
function ($x) {
echo 'Next: ', $x, PHP_EOL;
},
Expand All @@ -30,7 +25,7 @@ $subscription = $source->subscribe(new \Rx\Observer\CallbackObserver(
function () {
echo 'Completed', PHP_EOL;
}
));
);

//Next: 1
//Next: 2
Expand All @@ -40,15 +35,89 @@ $subscription = $source->subscribe(new \Rx\Observer\CallbackObserver(

```

## Quick start for demos

## Try out the demos

```bash
$ composer.phar install
$ git clone git@github.com:reactivex/RxPHP.git -b 2.x
$ cd RxPHP
$ composer install
$ php demo/interval/interval.php
```

Have fun running the demos in `/demo`.

note: The demos are automatically run within `Loop::execute`. When using RxPHP within your own project, you'll need to install a [loop implementation](https://packagist.org/providers/async-interop/event-loop-implementation).

## Installation
1. Install one [async-interop event loop](https://packagist.org/providers/async-interop/event-loop-implementation) implementation.

With ReactPHP:
```bash
$ composer require wyrihaximus/react-async-interop-loop
```
With amphp:
```bash
$ composer require amphp/loop:dev-master
```
With KoolKode:
```bash
$ composer require koolkode/async
```

2. Install RxPHP using [composer](https://getcomposer.org).

```bash
$ composer require reactivex/rxphp:2.x-dev
```

3. Write some code

```PHP
<?php

require_once __DIR__ . '/vendor/autoload.php';

use Rx\Observable;
use Interop\Async\Loop;

Loop::execute(function () {

Observable::interval(1000)
->take(5)
->flatMap(function ($i) {
return Observable::of($i + 1);
})
->subscribe(function ($e) {
echo $e, PHP_EOL;
});

});
```
## Working with Promises

Some async PHP frameworks have yet to fully embrace the awesome power of observables. To help ease the transition, RxPHP has support for promise libraries that implement the async-interop promise [specification](https://github.com/async-interop/promise).

Mixing a promise into an observable stream:

```PHP
Observable::interval(1000)
->flatMap(function ($i) {
return Observable::fromPromise(new Resolved($i));
})
->subscribe(function ($v) {
echo $v . PHP_EOL;
});
```

Converting an Observable into a promise. (This is useful for libraries that use generators and coroutines):
```PHP
$observable = Observable::interval(1000)
->take(10)
->toArray()
->map('json_encode');

$promise = $observable->toPromise();
```

## License

Expand Down
19 changes: 10 additions & 9 deletions composer.json
Expand Up @@ -20,25 +20,26 @@
}
],
"require": {
"php": "~5.5|~7.0"
"php": "~7.0",
"async-interop/promise": "^0.3",
"async-interop/event-loop": "^0.4",
"async-interop/event-loop-implementation": "^0.4"
},
"require-dev": {
"react/event-loop": "~0.4.1",
"react/promise": "~2.2",
"wyrihaximus/react-async-interop-loop": "^0.2.1",
"satooshi/php-coveralls": "~1.0",
"phpunit/phpcov": "^3.1",
"phpunit/phpunit": "^5.5"
},
"suggest": {
"react/event-loop": "For using event-loop based scheduling.",
"react/promise": "For converting promises to and from observables."
},
"autoload": {
"psr-4": { "Rx\\": "lib/Rx" }
"psr-4": { "Rx\\": "src" }
},
"autoload-dev": {
"files" : ["src/bootstrap.php"]
},
"extra": {
"branch-alias": {
"dev-master": "1.4-dev"
"dev-master": "2.0-dev"
}
}
}
12 changes: 12 additions & 0 deletions demo/catch/catch.php
@@ -0,0 +1,12 @@
<?php

require_once __DIR__ . '/../bootstrap.php';

$obs2 = Rx\Observable::of(42);

$source = \Rx\Observable::error(new Exception('Some error'))
->catch(function (Throwable $e, \Rx\Observable $sourceObs) use ($obs2) {
return $obs2;
});

$subscription = $source->subscribe($stdoutObserver);
File renamed without changes.
12 changes: 0 additions & 12 deletions demo/catch/catchError.php

This file was deleted.

7 changes: 1 addition & 6 deletions demo/combineLatest/combineLatest.php
Expand Up @@ -2,9 +2,6 @@

require_once __DIR__ . '/../bootstrap.php';

$loop = \React\EventLoop\Factory::create();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

/* Have staggering intervals */
$source1 = \Rx\Observable::interval(100);
$source2 = \Rx\Observable::interval(120);
Expand All @@ -13,6 +10,4 @@
return "First: {$value1}, Second: {$value2}";
})->take(4);

$subscription = $source->subscribe($stdoutObserver, $scheduler);

$loop->run();
$subscription = $source->subscribe($stdoutObserver);
6 changes: 3 additions & 3 deletions demo/concat/concat.php
Expand Up @@ -3,9 +3,9 @@
require_once __DIR__ . '/../bootstrap.php';


$source1 = \Rx\Observable::just(42);
$source2 = \Rx\Observable::just(56);
$source1 = \Rx\Observable::of(42);
$source2 = \Rx\Observable::of(56);

$source = \Rx\Observable::emptyObservable()->concat($source1)->concat($source2);
$source = \Rx\Observable::empty()->concat($source1)->concat($source2);

$subscription = $source->subscribe($stdoutObserver);
9 changes: 2 additions & 7 deletions demo/concat/concatMap.php
Expand Up @@ -2,18 +2,13 @@

require_once __DIR__ . '/../bootstrap.php';

$loop = \React\EventLoop\Factory::create();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

$source = Rx\Observable::range(0, 5)
->concatMap(function ($x, $i) use ($scheduler) {
return \Rx\Observable::interval(100, $scheduler)
->concatMap(function ($x, $i) {
return \Rx\Observable::interval(100)
->take($x)
->map(function () use ($i) {
return $i;
});
});

$subscription = $source->subscribe($stdoutObserver);

$loop->run();
7 changes: 1 addition & 6 deletions demo/concat/concatMapTo.php
Expand Up @@ -2,10 +2,7 @@

require_once __DIR__ . '/../bootstrap.php';

$loop = \React\EventLoop\Factory::create();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

$obs = \Rx\Observable::interval(100, $scheduler)
$obs = \Rx\Observable::interval(100)
->take(3)
->mapWithIndex(function ($i) {
return $i;
Expand All @@ -15,5 +12,3 @@
->concatMapTo($obs);

$subscription = $source->subscribe($stdoutObserver);

$loop->run();
14 changes: 5 additions & 9 deletions demo/custom-operator/Rot13Operator.php
Expand Up @@ -2,28 +2,24 @@

namespace Vendor\Rx\Operator;

use Rx\DisposableInterface;
use Rx\ObservableInterface;
use Rx\Observer\CallbackObserver;
use Rx\ObserverInterface;
use Rx\Operator\OperatorInterface;
use Rx\SchedulerInterface;

class Rot13Operator implements OperatorInterface
{
/**
* @inheritDoc
*/
public function __invoke(
ObservableInterface $observable,
ObserverInterface $observer,
SchedulerInterface $scheduler = null
) {
return $observable->subscribe(new CallbackObserver(
public function __invoke(ObservableInterface $observable, ObserverInterface $observer): DisposableInterface
{
return $observable->subscribe(
function ($json) use ($observer) {
$observer->onNext(str_rot13($json));
},
[$observer, 'onError'],
[$observer, 'onCompleted']
));
);
}
}
2 changes: 1 addition & 1 deletion demo/defaultIfEmpty/defaultIfEmpty.php
Expand Up @@ -2,6 +2,6 @@

require_once __DIR__ . '/../bootstrap.php';

$source = \Rx\Observable::emptyObservable()->defaultIfEmpty(new \Rx\Observable\ReturnObservable("something"));
$source = \Rx\Observable::empty()->defaultIfEmpty(Rx\Observable::of('something'));

$subscription = $source->subscribe($stdoutObserver);
2 changes: 1 addition & 1 deletion demo/defer/defer.php
Expand Up @@ -4,7 +4,7 @@


$source = \Rx\Observable::defer(function () {
return \Rx\Observable::just(42);
return \Rx\Observable::of(42);
});

$subscription = $source->subscribe($stdoutObserver);
12 changes: 3 additions & 9 deletions demo/delay/delay.php
@@ -1,16 +1,10 @@
<?php
require_once __DIR__ . '/../bootstrap.php';

$loop = new \React\EventLoop\StreamSelectLoop();

$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

\Rx\Observable::interval(1000, $scheduler)
\Rx\Observable::interval(1000)
->doOnNext(function ($x) {
echo "Side effect: " . $x . "\n";
echo 'Side effect: ' . $x . "\n";
})
->delay(500)
->take(5)
->subscribe($createStdoutObserver(), $scheduler);

$loop->run();
->subscribe($createStdoutObserver());
4 changes: 2 additions & 2 deletions demo/do/doOnEach.php → demo/do/do.php
Expand Up @@ -3,11 +3,11 @@
require_once __DIR__ . '/../bootstrap.php';

$source = \Rx\Observable::range(0, 3)
->doOnEach(new \Rx\Observer\CallbackObserver(
->do(new \Rx\Observer\CallbackObserver(
function ($x) {
echo 'Do Next:', $x, PHP_EOL;
},
function (Exception $err) {
function (Throwable $err) {
echo 'Do Error:', $err->getMessage(), PHP_EOL;
},
function () {
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion demo/do/doOnCompleted.php
Expand Up @@ -2,7 +2,7 @@

require_once __DIR__ . '/../bootstrap.php';

$source = \Rx\Observable::emptyObservable()
$source = \Rx\Observable::empty()
->doOnCompleted(function () {
echo 'Do Completed', PHP_EOL;
});
Expand Down
2 changes: 1 addition & 1 deletion demo/do/doOnError.php
Expand Up @@ -3,7 +3,7 @@
require_once __DIR__ . '/../bootstrap.php';

$source = \Rx\Observable::error(new Exception('Oops'))
->doOnError(function (Exception $err) {
->doOnError(function (Throwable $err) {
echo 'Do Error:', $err->getMessage(), PHP_EOL;
});

Expand Down
Expand Up @@ -2,5 +2,5 @@

require_once __DIR__ . '/../bootstrap.php';

$observable = \Rx\Observable::emptyObservable();
$observable = \Rx\Observable::empty();
$observable->subscribe($stdoutObserver);
File renamed without changes.

0 comments on commit a6fb7eb

Please sign in to comment.