In [29]:
%run startup.py

In [30]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')

<IPython.core.display.Javascript object>

# A Decision Tree of Observable Operators

## Part 1: NEW Observables.

> source: http://reactivex.io/documentation/operators.html#tree.  
> (transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, [axiros](http://www.axiros.com))  

**This tree can help you find the ReactiveX Observable operator you’re looking for.**  

<h2 id="tocheading">Table of Contents</h2>
<div id="toc"></div>

## Usage

There are no configured behind the scenes imports or code except [`startup.py`](./edit/startup.py), which defines output helper functions, mainly:

- `rst, reset_start_time`: resets a global timer, in order to have use cases starting from 0.
- `subs(observable)`: subscribes to an observable, printing notifications with time, thread, value


All other code is explicitly given in the notebook.  
Since all initialisiation of tools is in the first cell, you always have to run the first cell after ipython kernel restarts.  
**All other cells are autonmous.**

In the use case functions, in contrast to the official examples we simply use **`rand`** quite often (mapped to `randint(0, 100)`), to demonstrate when/how often observable sequences are generated and when their result is buffered for various subscribers.  
*When in doubt then run the cell again, you might have been "lucky" and got the same random.*

### RxJS
The (bold printed) operator functions are linked to the [official documentation](http://reactivex.io/documentation/operators.html#tree) and created roughly analogous to the **RxJS** examples. The rest of the TOC lines links to anchors within the notebooks. 

### Output
When the output is not in marble format we display it like so:

```
new subscription on stream 276507289 

   3.4  M [next]    1.4: {'answer': 42}
   3.5 T1 [cmpl]    1.6: fin
   
```
where the lines are syncronously `print`ed as they happen.  "M" and "T1" would be thread names ("M" is main thread).  
For each use case in `reset_start_time()` (alias `rst`), a global timer is set to 0 and we show the offset to it, in *milliseconds* & with one decimal value and also the offset to the start of stream subscription. In the example 3.4, 3.5 are millis since global counter reset, while 1.4, 1.6 are offsets to start of subscription.


# I want to create a **NEW** Observable...

## ... that emits a particular item: **[just](http://reactivex.io/documentation/operators/just.html) **

In [31]:
reset_start_time(reactivex.just)
stream = reactivex.just({'answer': rand()})
disposable = subs(stream)
sleep(0.5)
disposable = subs(stream) # same answer
# all stream ops work, its a real stream:
disposable = subs(stream.pipe(ops.map(lambda x: x.get('answer', 0) * 2)))





module reactivex
def return_value(
    value: _T, scheduler: Optional[abc.SchedulerBase] = None
):
    Alias for :func:`reactivex.return_value`.
--------------------------------------------------------------------------------

   2.0     M New subscription (23900) on stream 276504995
   2.5     M [next]    0.5: {'answer': 595} -> 23900
   2.5     M [cmpl]    0.5: fin -> 23900

 507.8     M New subscription (05073) on stream 276504995
 508.1     M [next]    0.3: {'answer': 595} -> 05073
 508.2     M [cmpl]    0.3: fin -> 05073

 508.5     M New subscription (56710) on stream 276505049
 508.6     M [next]    0.1: 1190 -> 56710
 508.7     M [cmpl]    0.2: fin -> 56710


## ..that was returned from a function *called at subscribe-time*: **[start](http://reactivex.io/documentation/operators/start.html)**

In [32]:
print('There is a little API difference to RxJS, see Remarks:\n')
rst(reactivex.start)

def f():
    log('function called')
    return rand()

stream = reactivex.start(func=f)
d = subs(stream)
d = subs(stream)

header("Exceptions are handled correctly (an observable should never except):")

def breaking_f():    
    return 1 / 0

stream = reactivex.start(func=breaking_f)
d = subs(stream)
d = subs(stream)



# startasync: only in python3 and possibly here(?) http://www.tornadoweb.org/en/stable/concurrent.html#tornado.concurrent.Future
#stream = O.start_async(f)
#d = subs(stream)


There is a little API difference to RxJS, see Remarks:




module reactivex
def start(
    func: Callable[[], _T], scheduler: Optional[abc.SchedulerBase] = None
):
    Invokes the specified function asynchronously on the specified
    scheduler, surfacing the result through an observable sequence.

    .. marble::
        :alt: start

        [ start(lambda i: return 4) ]
        -4-|
          -4-|

    Note:
        The function is called immediately, not during the subscription
        of the resulting sequence. Multiple subscriptions to the
        resulting sequence can observe the function's result.

    Example:
        >>> res = reactivex.start(lambda: pprint('hello'))
        >>> res = reactivex.start(lambda: pprint('hello'), rx.Scheduler.timeout)

    Args:
        func: Function to run asynchronously.
        scheduler: [Optional] Scheduler to run the function on. If
            not specified, defaults to an instance of
            :class:`TimeoutScheduler <reactivex.schedul

## ..that was returned from an Action, Callable, Runnable, or something of that sort, called at subscribe-time: **[from](http://reactivex.io/documentation/operators/from.html)**

In [33]:
rst(reactivex.from_iterable)
def f():
    log('function called')
    return rand()
# aliases: reactivex.from_, reactivex.from_list
# 1.: From a tuple:
stream = reactivex.from_iterable((1,2,rand()))
d = subs(stream)
# d = subs(stream) # same result

# 2. from a generator
gen = (rand() for j in range(3))
stream = reactivex.from_iterable(gen)
d = subs(stream)






module reactivex
def from_iterable(
    iterable: Iterable[_T], scheduler: Optional[abc.SchedulerBase] = None
):
    Converts an iterable to an observable sequence.

    .. marble::
        :alt: from_iterable

        [   from_iterable(1,2,3)    ]
        ---1--2--3--|


    Example:
        >>> reactivex.from_iterable([1,2,3])

    Args:
        iterable: An Iterable to change into an observable sequence.
        scheduler: [Optional] Scheduler instance to schedule the values on.
            If not specified, the default is to use an instance of
            :class:`CurrentThreadScheduler
            <reactivex.scheduler.CurrentThreadScheduler>`.

    Returns:
        The observable sequence whose elements are pulled from the
        given iterable sequence.
--------------------------------------------------------------------------------

   2.4     M New subscription (05085) on stream 276505043
   2.6     M [next]    0.2: 1 -> 05085
   2.6     M [next]    0.2: 2 -> 05085
   2.7   

In [34]:
rst(reactivex.from_callback)
# in my words: In the on_next of the subscriber you'll have the original arguments,
# potentially objects, e.g. user original http requests.
# i.e. you could merge those with the result stream of a backend call to
# a webservice or db and send the request.response back to the user then.

def g(f, a, b):
    f(a, b)
    log('called f')
stream = reactivex.from_callback(lambda a, b, f: g(f, a, b))('fu', 'bar')
d = subs(stream.pipe(ops.delay(200)))
# d = subs(stream.delay(200)) # does NOT work





module reactivex
def from_callback(
    func: Callable[..., Callable[..., None]],
    mapper: Optional[typing.Mapper[Any, Any]] = None,
):
    Converts a callback function to an observable sequence.

    Args:
        func: Function with a callback as the last argument to
            convert to an Observable sequence.
        mapper: [Optional] A mapper which takes the arguments
            from the callback to produce a single item to yield on
            next.

    Returns:
        A function, when executed with the required arguments minus
        the callback, produces an Observable sequence with a single
        value of the arguments to the callback as a list.
--------------------------------------------------------------------------------

   2.8     M New subscription (72519) on stream 277422508
   3.9     M called f


## ...after a specified delay: **[timer](http://reactivex.io/documentation/operators/timer.html)**

In [35]:
rst()
# start a stream of 0, 1, 2, .. after 200 ms, with a delay of 100 ms:
stream = reactivex.timer(200, 100).pipe(ops.time_interval(), ops.map(lambda x: 'val:%s dt:%s' % (x.value, x.interval)), ops.take(3))
d = subs(stream, name='observer1')
# intermix directly with another one
d = subs(stream, name='observer2')





   1.2     M New subscription (observer1) on stream 277372486

   2.8     M New subscription (observer2) on stream 277372486


## ...that emits a sequence of items repeatedly: **[repeat](http://reactivex.io/documentation/operators/repeat.html) **

In [38]:
rst(ops.repeat)
# repeat is over *values*, not function calls. Use generate or create for function calls!
subs(reactivex.repeat_value(20, 4))

header('do while:')
l = []
def condition(x):
    l.append(1)
    return True if len(l) < 2 else False
stream = reactivex.just(42).pipe(ops.do_while(condition))
d = subs(stream)






module reactivex.operators
def repeat(
    repeat_count: Optional[int] = None,
):
    Repeats the observable sequence a specified number of times.
    If the repeat count is not specified, the sequence repeats
    indefinitely.

    .. marble::
        :alt: repeat

        -1--2-|
        [    repeat(3)     ]
        -1--2--1--2--1--2-|


    Examples:
        >>> repeated = repeat()
        >>> repeated = repeat(42)
    Args:
        repeat_count: Number of times to repeat the sequence. If not
        provided, repeats the sequence indefinitely.

    Returns:
        An operator function that takes an observable sources and
        returns an observable sequence producing the elements of the
        given sequence repeatedly.
--------------------------------------------------------------------------------

   1.6     M New subscription (72266) on stream 277472509
   2.0     M [next]    0.3: 20 -> 72266
   2.1     M [next]    0.5: 20 -> 72266
   2.2     M [next]    0.6: 20 -> 72266

## ...from scratch, with custom logic and cleanup (calling a function again and again): **[create](http://reactivex.io/documentation/operators/create.html) **

In [39]:
rx = reactivex.create
rst(rx)

def f(obs):
    # this function is called for every observer
    obs.on_next(rand())
    obs.on_next(rand())
    obs.on_completed()
    def cleanup():
        log('cleaning up...')
    return cleanup
stream = reactivex.create(f).pipe(ops.delay(200)) # the delay causes the cleanup called before the subs gets the vals
d = subs(stream)
d = subs(stream)




sleep(0.5)
rst(title='Exceptions are handled nicely')
l = []
def excepting_f(obs):
    for i in range(3):
        l.append(1)
        obs.on_next('%s %s (observer hash: %s)' % (i, 1. / (3 - len(l)), hash(obs) ))
    obs.on_completed()

stream = reactivex.create(excepting_f)
d = subs(stream)
d = subs(stream)




rst(title='Feature or Bug?')
print('(where are the first two values?)')
l = []
def excepting_f(obs):
    for i in range(3):
        l.append(1)
        obs.on_next('%s %s (observer hash: %s)' % (i, 1. / (3 - len(l)), hash(obs) ))
    obs.on_completed()

stream = reactivex.create(excepting_f).pipe(ops.delay(100))
d = subs(stream)
d = subs(stream)
# I think its an (amazing) feature, preventing to process functions results of later(!) failing functions





module reactivex
def create(subscribe: typing.Subscription[_T]):
    Creates an observable sequence object from the specified
        subscription function.

    .. marble::
        :alt: create

        [     create(a)    ]
        ---1---2---3---4---|

    Args:
        subscribe: Subscription function.

    Returns:
        An observable sequence that can be subscribed to via the given
        subscription function.
--------------------------------------------------------------------------------

   1.7     M New subscription (90252) on stream 277631958
   2.0     M [err ]    0.3: f() takes 1 positional argument but 2 were given -> 90252

   2.2     M New subscription (23831) on stream 277631958
   2.4     M [err ]    0.2: f() takes 1 positional argument but 2 were given -> 23831




   1.1     M New subscription (72546) on stream 276602314
   1.4     M [err ]    0.2: excepting_f() takes 1 positional argument but 2 were given -> 72546

   1.6     M New subscription (66361) on str

In [44]:
rx = reactivex.generate
rst(rx)
"""The basic form of generate takes three parameters:

the first item to emit
a function to test an item to determine whether to emit it (true) or terminate the Observable (false)
a function to generate the next item to test and emit based on the value of the previous item
"""
def generator_based_on_previous(x): return x + 1.1
def doubler(x): return 2 * x
d = subs(rx(0, lambda x: x < 4, generator_based_on_previous))




module reactivex
def generate(
    initial_state: _TState,
    condition: typing.Predicate[_TState],
    iterate: typing.Mapper[_TState, _TState],
):
    Generates an observable sequence by running a state-driven loop
    producing the sequence's elements.

    .. marble::
        :alt: generate

        [   generate()    ]
        -1-2-3-4-|

    Example:
        >>> res = reactivex.generate(0, lambda x: x < 10, lambda x: x + 1)

    Args:
        initial_state: Initial state.
        condition: Condition to terminate generation (upon returning
            :code:`False`).
        iterate: Iteration step function.

    Returns:
        The generated sequence.
--------------------------------------------------------------------------------

   5.2     M New subscription (34434) on stream 277634169
   5.4     M [next]    0.2: 0 -> 34434
   5.5     M [next]    0.3: 1.1 -> 34434
   5.5     M [next]    0.3: 2.2 -> 34434
   5.6     M [next]    0.4: 3.3000000000000003 -> 34434
   5.6     M

In [45]:
rx = reactivex.generate_with_relative_time
rst(rx)
stream = rx(1, lambda x: x < 4, lambda x: x + 1, lambda t: 100)
d = subs(stream)





module reactivex
def generate_with_relative_time(
    initial_state: _TState,
    condition: typing.Predicate[_TState],
    iterate: typing.Mapper[_TState, _TState],
    time_mapper: Callable[[_TState], typing.RelativeTime],
):
    Generates an observable sequence by iterating a state from an
    initial state until the condition fails.

    .. marble::
        :alt: generate_with_relative_time

        [generate_with_relative_time()]
        -1-2-3-4-|

    Example:
        >>> res = reactivex.generate_with_relative_time(
            0, lambda x: True, lambda x: x + 1, lambda x: 0.5
        )

    Args:
        initial_state: Initial state.
        condition: Condition to terminate generation (upon returning
            :code:`False`).
        iterate: Iteration step function.
        time_mapper: Time mapper function to control the speed of
            values being produced each iteration, returning relative times, i.e.
            either a :class:`float` denoting seconds, or an i

## ...for each observer that subscribes OR according to a condition at subscription time: **[defer / if_then](http://reactivex.io/documentation/operators/defer.html) **

In [47]:
rst(reactivex.defer)
# plural! (unique per subscription)
streams = reactivex.defer(lambda: reactivex.just(rand()))
d = subs(streams)
d = subs(streams) # gets other values - created by subscription!




module reactivex
def defer(
    factory: Callable[[abc.SchedulerBase], Union[Observable[_T], "Future[_T]"]]
):
    Returns an observable sequence that invokes the specified
    factory function whenever a new observer subscribes.

    .. marble::
        :alt: defer

        [     defer(1,2,3)     ]
        ---1--2--3--|
                ---1--2--3--|

    Example:
        >>> res = reactivex.defer(lambda scheduler: of(1, 2, 3))

    Args:
        factory: Observable factory function to invoke for each observer
            which invokes :func:`subscribe()
            <reactivex.Observable.subscribe>` on the resulting sequence.
            The factory takes a single argument, the scheduler used.

    Returns:
        An observable sequence whose observers trigger an invocation
        of the given factory function.
--------------------------------------------------------------------------------

   3.8     M New subscription (72483) on stream 277626929
   5.1     M [err ]    1.4: <lam

In [48]:
# evaluating a condition at subscription time in order to decide which of two streams to take.
rst(reactivex.if_then)
cond = True
def should_run():
    return cond
streams = reactivex.if_then(should_run, reactivex.return_value(43), reactivex.return_value(56))
d = subs(streams)

log('condition will now evaluate falsy:')
cond = False
streams = reactivex.if_then(should_run, reactivex.return_value(43), reactivex.return_value(rand()))
d = subs(streams)
d = subs(streams)




module reactivex
def if_then(
    condition: Callable[[], bool],
    then_source: Union[Observable[_T], "Future[_T]"],
    else_source: Union[None, Observable[_T], "Future[_T]"] = None,
):
    Determines whether an observable collection contains values.

    .. marble::
        :alt: if_then

        ---1--2--3--|
        --6--8--|
        [    if_then()     ]
        ---1--2--3--|


    Examples:
        >>> res = reactivex.if_then(condition, obs1)
        >>> res = reactivex.if_then(condition, obs1, obs2)

    Args:
        condition: The condition which determines if the then_source or
            else_source will be run.
        then_source: The observable sequence or :class:`Future` that
            will be run if the condition function returns :code:`True`.
        else_source: [Optional] The observable sequence or :class:`Future`
            that will be run if the condition function returns :code:`False`.
            If this is not provided, it defaults to :func:`empty() <re

## ...that emits a sequence of integers: **[range](http://reactivex.io/documentation/operators/range.html) **

In [49]:
rst(reactivex.range)
d = subs(reactivex.range(0, 3))




module reactivex
def range(
    start: int,
    stop: Optional[int] = None,
    step: Optional[int] = None,
    scheduler: Optional[abc.SchedulerBase] = None,
):
    Generates an observable sequence of integral numbers within a
    specified range, using the specified scheduler to send out observer
    messages.

    .. marble::
        :alt: range

        [    range(4)     ]
        --0--1--2--3--|

    Examples:
        >>> res = reactivex.range(10)
        >>> res = reactivex.range(0, 10)
        >>> res = reactivex.range(0, 10, 1)

    Args:
        start: The value of the first integer in the sequence.
        stop: [Optional] Generate number up to (exclusive) the stop
            value. Default is `sys.maxsize`.
        step: [Optional] The step to be used (default is 1).
        scheduler: [Optional] The scheduler to schedule the values on.
            If not specified, the default is to use an instance of
            :class:`CurrentThreadScheduler
            <reactivex.sch

### ...at particular intervals of time: **[interval](http://reactivex.io/documentation/operators/interval.html) **

(you can `.publish()` it to get an easy "hot" observable)

In [50]:
rst(reactivex.interval)
d = subs(reactivex.interval(period=100).pipe(ops.time_interval(), \
                                             ops.map(lambda x, v: f"{ItemGetter(x)[0]}s {ItemGetter(x)[1]}s"), \
                                             ops.take(3)))




module reactivex
def interval(
    period: typing.RelativeTime, scheduler: Optional[abc.SchedulerBase] = None
):
    Returns an observable sequence that produces a value after each period.

    .. marble::
        :alt: interval

        [  interval()   ]
        ---1---2---3---4--->

    Example:
        >>> res = reactivex.interval(1.0)

    Args:
        period: Period for producing the values in the resulting sequence
            (specified as a :class:`float` denoting seconds or an instance of
            :class:`timedelta`).
        scheduler:  Scheduler to run the interval on. If not specified, an
            instance of :class:`TimeoutScheduler <reactivex.scheduler.TimeoutScheduler>`
            is used.

    Returns:
        An observable sequence that produces a value after each period.
--------------------------------------------------------------------------------

   4.5     M New subscription (72323) on stream 277473564


### ...after a specified delay (see timer)

## ...that completes without emitting items: **[empty](http://reactivex.io/documentation/operators/empty-never-throw.html) **

In [51]:
rst(reactivex.empty)
d = subs(reactivex.empty())




module reactivex
def empty(scheduler: Optional[abc.SchedulerBase] = None):
    Returns an empty observable sequence.

    .. marble::
        :alt: empty

        [     empty()     ]
        --|

    Example:
        >>> obs = reactivex.empty()

    Args:
        scheduler: [Optional] Scheduler instance to send the termination call
            on. By default, this will use an instance of
            :class:`ImmediateScheduler <reactivex.scheduler.ImmediateScheduler>`.

    Returns:
        An observable sequence with no elements.
--------------------------------------------------------------------------------

   5.1     M New subscription (57279) on stream 276423831
   5.3     M [cmpl]    0.2: fin -> 57279


## ...that does nothing at all: **[never](http://reactivex.io/documentation/operators/empty-never-throw.html) **

In [52]:
rst(reactivex.never)
d = subs(reactivex.never())




module reactivex
def never():
    Returns a non-terminating observable sequence, which can be used
    to denote an infinite duration (e.g. when using reactive joins).

    .. marble::
        :alt: never

        [     never()     ]
        -->

    Returns:
        An observable sequence whose observers will never get called.
--------------------------------------------------------------------------------

   1.4     M New subscription (72419) on stream 277422544


## ...that excepts: **[throw](http://reactivex.io/documentation/operators/empty-never-throw.html) **

In [55]:
rst(reactivex.Observer.on_error)
d = subs(reactivex.repeat_value(20/0, 4).on_error(error=ZeroDivisionError))




module reactivex.observer.observer
def on_error(self, error: Exception):
    Notify the observer that an exception has occurred.

        Args:
            error: The error that occurred.
--------------------------------------------------------------------------------


ZeroDivisionError: division by zero

2700.7   T36 [next] 200011.0: 2 -> 66340
16155.0   T37 [next] 200014.4: 2 -> 34491
