# https://www.tutorialspoint.com/rxpy/rxpy_quick_guide.htm

In [None]:


import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9).pipe(
    ops.filter(lambda i: i % 2 == 0),
).subscribe(lambda x: print("Even numbers are {0}".format(x)))

rx.of(1,2,3,4,5,6,7,8,9).pipe(
    ops.filter(lambda i: i % 2 == 0),
    ops.sum()
).subscribe(lambda x: print("Sum of even numbers are {0}".format(x)))

# Creating Observables

* on_next() − This function gets called when the Observable emits an item.

* on_completed() − This function gets called when the Observable is complete.

* on_error() − This function gets called when an error occurs on the Observable.

* we attach the functions to the observable inside a function in this case

In [None]:

from rx import create
def test_observable(observer, scheduler):
    observer.on_next("Hello")
    observer.on_next("Can you hear me?")
    observer.on_next([i * i for i in range(100)])
    # below line throws an error
    # for actual development do not need .on_error(), below is shown 
    # just to test the on_error = lambda e: ... 
    # in source.subscribe(...) which should process any error that comes up
    observer.on_error("Error")
    observer.on_completed()
source = create(test_observable)
source.subscribe(
    on_next = lambda i: print("Got - {0}".format(i)),
    # receive error passed from above, if dont have the below line python terminal will throw error
    # 
    on_error = lambda e: print("Got - {0}".format(e)),
    on_completed = lambda: print("Job done!"),
)

In [40]:
# from_(iterator)
# This method will convert the given array or object into an observable.
# iterator is an object or array

# cannot stack lambdas in subscribe, or on_next
# so it is better to use create()?
from rx import from_

dict1 = {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5}
test = from_(dict1)
test.subscribe(
    lambda x: print("Dict keys are: {keys}, Dict values are: {values}".format(keys = x, values = dict1[x])),
    lambda y: print("This will not be printed"),
    on_next = lambda z: print("This will not be printed too!"),
    on_completed = lambda: print("Done!")
)




Dict keys are: a, Dict values are: 1
Dict keys are: b, Dict values are: 2
Dict keys are: c, Dict values are: 3
Dict keys are: d, Dict values are: 4
Dict keys are: e, Dict values are: 5
Done!


<rx.disposable.disposable.Disposable at 0x7f5ab67b9760>

In [41]:
# interval
# this method will give a series of values produced after a timeout
# syntax: interval(period)
# parameters: period: to start the integer sequence
# return value: returns an observable with all the values in sequential order
# notice that when run, the print statements returned include values from previous observable sequences
import rx
from rx import operators as ops
import datetime
rx.interval(3).pipe(
    ops.map(lambda i: i * i)
).subscribe(lambda x: print("From rx.interval(): {0}\nTime now is {1} ".format(x, datetime.datetime.now())))
input("Press any key to exit! \n")

The value is 125316
From rx.interval() 97344
The value is 139876
From rx.interval(): 56169
Time now is 2021-02-01 21:42:44.805091 
The value is 156816
From rx.interval(): 75625
 Time now is 2021-02-01 21:42:44.894360: 
From rx.interval(): 0
Time now is 2021-02-01 21:42:45.113869 
The value is 126025
From rx.interval() 97969
The value is 140625


''

In [42]:
# just
# the method will convert given value into an observable
# notice that the list is not iterated through, it just returns it wholesale

from rx import just
test = just([15, 25,50, 55])
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

The value is [15, 25, 50, 55]
Job Done!


<rx.disposable.disposable.Disposable at 0x7f5ab574bfd0>

In [43]:
from rx import repeat_value
test = repeat_value(44,10)
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
Job Done!


<rx.disposable.disposable.Disposable at 0x7f5ab67b9f40>

In [44]:
# notice that timer clash with interval
"""
Syntax
timer(duetime)
Parameters
duetime: time after which it should emit the first value.

Return value
It will return an observable with values emitted after duetime.

"""
import rx
from rx import operators as ops
rx.timer(5.0, 10).pipe(
   ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")

From rx.interval(): 57121
Time now is 2021-02-01 21:42:50.806912 
The value is 158404
From rx.interval(): 76729
 Time now is 2021-02-01 21:42:50.895677: 
From rx.interval(): 4
Time now is 2021-02-01 21:42:51.116155 
The value is 127449
From rx.interval() 99225
The value is 142129
From rx.interval(): 57600
Time now is 2021-02-01 21:42:53.807300 
The value is 159201
From rx.interval(): 77284
 Time now is 2021-02-01 21:42:53.896594: 
From rx.interval(): 9
Time now is 2021-02-01 21:42:54.116522 
The value is 128164
The value is 0
From rx.interval() 99856
The value is 142884
From rx.interval(): 58081
Time now is 2021-02-01 21:42:56.807948 
The value is 160000
From rx.interval(): 77841
 Time now is 2021-02-01 21:42:56.902933: 


''