# Introduction to RxPY

This is an adaptation of the tutorial [here](https://github.com/ReactiveX/RxPY/blob/master/notebooks/Getting%20Started.ipynb), but that one doesn't seem to be updated any more and doesn't work on the latest version. So I am updating it here.

Rx is about processing streams of events. With Rx you:

*  Tell what you want to process (Observable)
*  How you want to process it (A composition of operators)
*  What you want to do with the result (Observer)

It's important to understand that with Rx you describe what you want to do with events if and when they arrive. It's all a declarative composition of operators that will do some processing the events when they arrive. If nothing happens, then nothing is processed.

Thus the pattern is that you `subscribe` to an `Observable` using an `Observer`:

```python
subscription = Observable.subscribe(observer)
```

***NOTE:*** Observables are not active in themselves. They need to be subscribed to make something happen. Simply having an Observable lying around doesn't make anything happen.

## Importing the Rx module

In [None]:
import rx

print(rx.__version__)
from rx import Observable
from rx import of, operators as op



## Creating a Simple Example

Here we will push 5 strings and then create them


In [None]:
from rx import create

def push_five_strings(observer, scheduler):
    observer.on_next("Alpha")
    observer.on_next("Beta")
    observer.on_next("Gamma")
    observer.on_next("Delta")
    observer.on_next("Epsilon")
    observer.on_completed()

source = create(push_five_strings)

source.subscribe(
    on_next = lambda i: print("Received {0}".format(i)),
    on_error = lambda e: print("Error Occurred: {0}".format(e)),
    on_completed = lambda: print("Done!"),
)

## Reducing Verbositly

This can actually be a lot shorter with the `.of()` function

In [None]:
from rx import of

source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

source.subscribe(lambda value: print("Received {0}".format(value)))


## Generating a sequence

There are many ways to generate a sequence of events. The easiest way to get started is to use the `from_iterable()` operator that is also called just `from_`. Other operators you may use to generate a sequence such as `just`, `generate`, `create` and `range`.

In [None]:
class MyObserver(Observable):
    def on_next(self, x):
        print("Got: %s" % x)
        
    def on_error(self, e):
        print("Got error: %s" % e)
        
    def on_completed(self):
        print("Sequence completed")

xs = rx.from_iterable(range(10))
d = xs.subscribe(MyObserver())

In [None]:
xs = rx.from_(range(10))
d = xs.subscribe(print)

**NOTE:** The subscribe method takes an observer, or one to three callbacks for handing `on_next()`, `on_error()`, and `on_completed()`. This is why we can use `print` directly as the observer in the example above, since it becomes the `on_next()` handler for an anonymous observer. 

# Filtering a sequence

First, let's set this up with a a simple starter, with no filtering.

In [None]:
range(10)

of(*range(10)).subscribe(lambda value: print("Received {0}".format(value)))


## Now let's add some filtering

In [None]:
of(*range(10)).pipe(
    op.filter(lambda i: i % 2)
).subscribe(lambda value: print("Received {0}".format(value)))


## Transforming a sequence

In [None]:

of(*range(10)).pipe(
    op.map(lambda x: x * 2)
).subscribe(lambda value: print("Received {0}".format(value)))


**NOTE: ** You can also take an index as the second parameter to the mapper function:

## Merge

Merging two observable sequences into a single observable sequence using the `merge` operator:

In [None]:
xs = of(*range(10))
ys = of("abcde")
zs = xs.pipe(op.merge(ys)).subscribe(print)

## The Spacetime of Rx

In the examples above all the events happen at the same moment in time. The events are only separated by ordering. This confuses many newcomers to Rx since the result of the `merge` operation above may have several valid results such as:

    a1b2c3d4e5
    1a2b3c4d5e
    ab12cd34e5
    abcde12345
    
The only guarantee you have is that 1 will be before 2 in `xs`, but 1 in `xs` can be before or after `a` in `ys`. It's up the the sort stability of the scheduler to decide which event should go first. For real time data streams this will not be a problem since the events will be separated by actual time. To make sure you get the results you "expect", it's always a good idea to add some time between the events when playing with Rx.

## Marbles and Marble Diagrams

As we saw in the previous section it's nice to add some time when playing with Rx and RxPY. A great way to explore RxPY is to use the `marbles` test module that enables us to play with [marble diagrams](http://rxmarbles.com). 

In [None]:
from rx.testing import marbles

xs = rx.from_marbles("a-b-c-|")
xs.run()

In [None]:
xs = rx.from_marbles("1-2-3-4-5")
ys = rx.from_marbles("1-2-3-4-5")
xs.pipe(op.merge(ys)).subscribe(print)

## Subjects and Streams

A simple way to create an observable stream is to use a subject. It's probably called a subject after the Subject-Observer pattern described in the [Design Patterns](http://www.amazon.com/Design-Patterns-Elements-Reusable-Object-Oriented/dp/0201633612/ref=sr_1_1?s=books&ie=UTF8&qid=1431184351&sr=1-1&keywords=design+patterns) book by the gang of four (GOF).

Anyway, a Subject is both an `Observable` and an `Observer`, so you can both subscribe to it and `on_next` it with events. This makes it an obvious candidate if need to publish values into an observable stream for processing:

In [None]:
from rx.subject import Subject

stream = Subject()
stream.on_next(41)

d = stream.subscribe(lambda x: print("Got: %s" % x))

stream.on_next(42)

d.dispose()
stream.on_next(43)

*That's all for now*