# Installing Reactivex

In [1]:
pip install reactivex

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install rx




# Creating observable 

In [4]:
from rx import create
def test_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Got - Hello
Job Done!


<rx.disposable.disposable.Disposable at 0x1ae47fda610>

In [4]:
#push five strings using of() function  
from reactivex import of
source= of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
source.subscribe(
 on_next= lambda n: print("Recieved {0}".format(n)),
 on_error= lambda e: print("Error Occured {0}" .format(e)),
 on_completed= lambda: print("Done!"),   
)

Recieved Alpha
Recieved Beta
Recieved Gamma
Recieved Delta
Recieved Epsilon
Done!


<reactivex.disposable.disposable.Disposable at 0x271cd1837f0>

In [5]:
from reactivex import empty
test= empty()
test.subscribe(
 lambda x: print("Got-- {0}".format(x)),
 on_error=lambda e: print("Error: {0}".format(e)),
 on_completed=lambda : print("Job Done!"),   
)

Job Done!


<reactivex.disposable.disposable.Disposable at 0x271cd2a32e0>

In [6]:
from reactivex import never
test=never()
test.subscribe(
 lambda x: print("Got-- {0}".format(x)),
 on_error= lambda e: print("Error: {0}".format(e)),
 on_completed= lambda: print("Job done!")    
)

<reactivex.disposable.disposable.Disposable at 0x271cd2c13a0>

In [7]:
from rx import throw
test = throw(Exception('There is an Error!'))
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Error : There is an Error!


<rx.disposable.disposable.Disposable at 0x271cd34c880>

In [8]:
from rx import just
test = just([15, 25,50, 55])
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

The value is [15, 25, 50, 55]
Job Done!


<rx.disposable.disposable.Disposable at 0x271cd1d6b80>

In [9]:
from rx import range
test = range(0,10)
test.subscribe(
   on_next= lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

The value is 0
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
Job Done!


<rx.disposable.disposable.Disposable at 0x271cd35f4c0>

In [10]:
from rx import repeat_value
test = repeat_value(44,10)
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
Job Done!


<rx.disposable.disposable.Disposable at 0x271cd3aaa60>

In [11]:
from rx import start
test = start(lambda : "Hello World")
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

The value is Hello World
Job Done!


<rx.disposable.disposable.Disposable at 0x271cd3aa760>

# Operators and chaining

In [12]:
#Typically, you do not want to save Observables into intermediary variables for each operator, unless you want to have 
#multiple subscribers at that point. Instead, you want to strive to inline and create an “Observable pipeline” of operations. 
#That way your code is readable and tells a story much more easily.

from reactivex import of, operators as op
of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
  op.map(lambda s: len(s)),
  op.filter(lambda i: i>=5)
).subscribe(lambda value: print("Recieved {0}".format(value)))


Recieved 5
Recieved 5
Recieved 5
Recieved 7


<reactivex.disposable.disposable.Disposable at 0x271cd3e0b80>

 Mathematical Operators 

In [13]:
from reactivex import of,operators as op
test=of(1,2,3,4,5,6,7,8,9)

sub1=test.pipe(
  op.average()
)

sub1.subscribe(lambda x: print("Average is {0}".format(x)) )

Average is 5.0


<reactivex.disposable.disposable.Disposable at 0x271cd3fb760>

In [14]:
#Count Operator
from reactivex import of,operators as op
test=of(1,2,3,4,5,6,7,8,9)

sub1=test.pipe(
 op.count()
)

sub1.subscribe(lambda x: print("Count is {0}".format(x)))

Count is 9


<reactivex.disposable.disposable.Disposable at 0x271cd3fb310>

In [15]:
#Max Operators

from rx import of, operators as op
test = of(12,32,41,50,280,250)
sub1 = test.pipe(
   op.max()
)
sub1.subscribe(lambda x: print("Max value is {0}".format(x)))

Max value is 280


<rx.disposable.disposable.Disposable at 0x271cd3fba30>

In [16]:
#Min Operators

from rx import of, operators as op
test = of(12,32,41,50,280,250)
sub1 = test.pipe(
   op.min()
)
sub1.subscribe(lambda x: print("Min value is {0}".format(x)))

Min value is 12


<rx.disposable.disposable.Disposable at 0x271cd407160>

In [17]:
#reduce operator

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))


The value is 55


<rx.disposable.disposable.Disposable at 0x271cd407a00>

In [18]:
#sum operator

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

Sum of Even numbers is 30


<rx.disposable.disposable.Disposable at 0x271cd4121c0>

# Transfromation Operators

In [19]:
'''buffer:-
This operator will collect all the values, from the source observable and emit 
them at regular intervals once the given boundary condition is satisfied.'''

from rx import of, interval, operators as op
from datetime import date
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.buffer(interval(1.0))
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

The element is [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


<rx.disposable.disposable.Disposable at 0x271cd422910>

In [20]:
'''ground_by:-
This operator will group the values coming from the source observable based on the key_mapper function given.'''

from rx import from_, interval, operators as op
test = from_(["A", "B", "C", "D"])
sub1 = test.pipe(
   op.group_by(lambda v: v[0])
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x00000271CD436850>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x00000271CD436970>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x00000271CD436190>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x00000271CD436610>


<rx.disposable.disposable.Disposable at 0x271cd41abe0>

In [21]:
'''map:-
This operator will change each value from the source observable into a new value based on the o/p of the mapper_func given.'''

from reactivex import of, operators as op
source=of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

composed=source.pipe(
  op.map(lambda s:len(s)),
  op.filter(lambda i: i>=5) 
)

composed.subscribe(
    lambda value: print("Recieved {0}".format(value))
)

Recieved 5
Recieved 5
Recieved 5
Recieved 7


<reactivex.disposable.disposable.Disposable at 0x271cd42ea00>

In [22]:
from rx import of, interval, operators as op
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.map(lambda x :x*x)
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

The element is 1
The element is 4
The element is 9
The element is 16
The element is 25
The element is 36
The element is 49
The element is 64
The element is 81
The element is 100


<rx.disposable.disposable.Disposable at 0x271cd436700>

In [23]:
'''scan:-
This operator will apply an accumulator function to the values coming from the source observable
and return an observable with new values.'''

from rx import of, interval, operators as op
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.scan(lambda acc, a: acc + a, 0))
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

The element is 1
The element is 3
The element is 6
The element is 10
The element is 15
The element is 21
The element is 28
The element is 36
The element is 45
The element is 55


<rx.disposable.disposable.Disposable at 0x271cd440d00>

# Filtering Operators 

In [24]:
'''debounce:-
This operator will give the values from the source observable, 
until the timespan given and ignore the rest of the values if time passes.'''


from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.debounce(2.0)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is 10


<rx.disposable.disposable.Disposable at 0x271cd440ee0>

In [25]:
'''distinct:-
This operator will give all the values that are distinct from the source observable'''

from rx import of, operators as op
from datetime import date
test = of(1, 6, 15, 1, 10, 6, 40, 10, 58, 20, 40)
sub1 = test.pipe(
   op.distinct()
)
sub1.subscribe(lambda x: print("The distinct value is {0}".format(x)))

The distinct value is 1
The distinct value is 6
The distinct value is 15
The distinct value is 10
The distinct value is 40
The distinct value is 58
The distinct value is 20


<rx.disposable.disposable.Disposable at 0x271cd440130>

In [26]:
'''element_at:-
This operator will give an element from the source observable for the index given.'''

from rx import of, operators as op
from datetime import date
test = of(1, 6, 15, 1, 10, 6, 40, 10, 58, 20, 40)
sub1 = test.pipe(
   op.element_at(5)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))


The value is 6


<rx.disposable.disposable.Disposable at 0x271cd443a30>

In [27]:
'''filter:-
This operator will filter values from the source observable based on the predicate function given.'''

from rx import of, operators as op
from datetime import date
test = of(1, 6, 15, 1, 10, 6, 40, 10, 58, 20, 40)
sub1 = test.pipe(
   op.filter(lambda x : x %2==0)
)
sub1.subscribe(lambda x: print("The filtered value is {0}".format(x)))

The filtered value is 6
The filtered value is 10
The filtered value is 6
The filtered value is 40
The filtered value is 10
The filtered value is 58
The filtered value is 20
The filtered value is 40


<rx.disposable.disposable.Disposable at 0x271cd443550>

In [28]:
#first:- This operator will give the first element from the source observable.

from rx import of, operators as op
from datetime import date
test = of(1, 6, 15, 1, 10, 6, 40, 10, 58, 20, 40)
sub1 = test.pipe(
   op.first()
)
sub1.subscribe(lambda x: print("The first element is {0}".format(x)))

The first element is 1


<rx.disposable.disposable.Disposable at 0x271cd440df0>

In [29]:
'''ignore_elements:-
This operator will ignore all the values from the source Observable, 
and only execute calls to complete or error callback functions.'''

from rx import of, operators as op
from datetime import date
test = of(1, 6, 15, 1, 10, 6, 40, 10, 58, 20, 40)
sub1 = test.pipe(
   op.ignore_elements()
)
sub1.subscribe(lambda x: print("The first element is {0}".format(x)),
lambda e: print("Error : {0}".format(e)),
lambda: print("Job Done!"))

Job Done!


<rx.disposable.disposable.Disposable at 0x271cd436400>

In [30]:
#last:- This operator will give the last element from the source observable.

from rx import of, operators as op
from datetime import date
test = of(1, 6, 15, 1, 10, 6, 40, 10, 58, 20, 40)
sub1 = test.pipe(
   op.last()
)
sub1.subscribe(lambda x: print("The last element is {0}".format(x)))

The last element is 40


<rx.disposable.disposable.Disposable at 0x271cd464cd0>

In [31]:
'''skip:-
This operator will give back an observable, that will skip the first occurrence of count items taken as input.'''

from rx import of, operators as op
from datetime import date
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.skip(5)
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

The element is 6
The element is 7
The element is 8
The element is 9
The element is 10


<rx.disposable.disposable.Disposable at 0x271cd464eb0>

In [32]:
'''skip_last:-
This operator will give back an observable, that will skip the last occurrence of count items taken as input.'''

from rx import of, operators as op
from datetime import date
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.skip_last(5)
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

The element is 1
The element is 2
The element is 3
The element is 4
The element is 5


<rx.disposable.disposable.Disposable at 0x271cd41a670>

In [33]:
#take:- This operator will give a list of source values in continuous order based on the count given.


from rx import of, operators as op
from datetime import date
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.take(5)
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

The element is 1
The element is 2
The element is 3
The element is 4
The element is 5


<rx.disposable.disposable.Disposable at 0x271cd46f730>

In [34]:
#take_last:- This operator will give a list of source values, in continuous order from last based on the count given.

from rx import of, operators as op
from datetime import date
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.take_last(5)
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

The element is 6
The element is 7
The element is 8
The element is 9
The element is 10


<rx.disposable.disposable.Disposable at 0x271cd46fb20>

# Error handling operators

In [35]:
#catch:- This operator will terminate the source observable when there is an exception.

from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6)
handler = of(11,12,13,14)
def casetest(e):
   if (e==4):
      raise Exception('err')
   else:
      return e
sub1 = test.pipe(
   op.map(lambda e : casetest(e)),
   op.catch(handler)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)))

The value is 1
The value is 2
The value is 3
The value is 11
The value is 12
The value is 13
The value is 14


<rx.disposable.disposable.Disposable at 0x271cd464a30>

In [36]:
#retry:-
#This operator will retry on the source observable when there is an error and once the retry count is done it will terminate.

from rx import of, operators as op
test = of(1,2,3,4,5,6)
def casetest(e):
   if (e==4):
     raise Exception('There is error cannot proceed!')
   else:
     return e
sub1 = test.pipe(
   op.map(lambda e : casetest(e)),
   op.retry(2)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)))

The value is 1
The value is 2
The value is 3
The value is 1
The value is 2
The value is 3
Error : There is error cannot proceed!


<rx.disposable.disposable.Disposable at 0x271cd476220>

# Utility Operators

In [37]:
#delay:- This operator will delay the source observable emission as per the time or date given

from rx import of, operators as op
import datetime
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.delay(5.0)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")

The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
Press any key to exit



''

In [38]:
'''materialize:-
This operator will convert the values from the source observable with the values emitted in the form of explicit notification
values.'''

from rx import of, operators as op
import datetime
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.materialize()
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is OnNext(1.0)
The value is OnNext(2.0)
The value is OnNext(3.0)
The value is OnNext(4.0)
The value is OnNext(5.0)
The value is OnCompleted()


<rx.disposable.disposable.Disposable at 0x271cd483610>

In [39]:
#time_interval:- This operator will give the time elapsed between the values from the source observable

from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6)
sub1 = test.pipe(
   op.time_interval()
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is TimeInterval(value=1, interval=datetime.timedelta(0))
The value is TimeInterval(value=2, interval=datetime.timedelta(0))
The value is TimeInterval(value=3, interval=datetime.timedelta(0))
The value is TimeInterval(value=4, interval=datetime.timedelta(0))
The value is TimeInterval(value=5, interval=datetime.timedelta(microseconds=1000))
The value is TimeInterval(value=6, interval=datetime.timedelta(0))


<rx.disposable.disposable.Disposable at 0x271cd4757c0>

In [40]:
#timeout:-This operator will give all the value from the source observable, after the elapsed time or else will trigger an error

from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6)
sub1 = test.pipe(
   op.timeout(5.0)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6


<rx.disposable.disposable.Disposable at 0x271cd483220>

In [41]:
#timestamp:- This operator will attach a timestamp to all the values from the source observable.

from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6)
sub1 = test.pipe(
   op.timestamp()
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is Timestamp(value=1, timestamp=datetime.datetime(2023, 3, 28, 10, 10, 29, 367972))
The value is Timestamp(value=2, timestamp=datetime.datetime(2023, 3, 28, 10, 10, 29, 368971))
The value is Timestamp(value=3, timestamp=datetime.datetime(2023, 3, 28, 10, 10, 29, 368971))
The value is Timestamp(value=4, timestamp=datetime.datetime(2023, 3, 28, 10, 10, 29, 368971))
The value is Timestamp(value=5, timestamp=datetime.datetime(2023, 3, 28, 10, 10, 29, 368971))
The value is Timestamp(value=6, timestamp=datetime.datetime(2023, 3, 28, 10, 10, 29, 368971))


<rx.disposable.disposable.Disposable at 0x271cd483be0>

# Conditional and Boolean operators

In [42]:
#all:- This operator will check if all the values from the source observable satisfy the condition given.

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.all(lambda a: a<10)
)
sub1.subscribe(lambda x: print("The result is {0}".format(x)))

The result is False


<rx.disposable.disposable.Disposable at 0x271cd476250>

In [43]:
'''contains:- This operator will return an observable with the value true or false if the given value is present 
is the values of the source observable.'''

from rx import of, operators as op
test = of(17, 25, 34, 56, 78)
sub1 = test.pipe(
   op.contains(34)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is True


<rx.disposable.disposable.Disposable at 0x271cd46f1c0>

In [44]:
#Using comparer
from rx import of, operators as op
test = of(17, 25, 34, 56, 78)
sub1 = test.pipe(
   op.contains(34, lambda x, y: x == y)
)
sub1.subscribe(lambda x: print("The valus is {0}".format(x)))

The valus is True


<rx.disposable.disposable.Disposable at 0x271cd49a910>

In [45]:
#default_if_empty:- This operator will return a default value if the source observable is empty.

from rx import of, operators as op
test = of()
sub1 = test.pipe(
   op.default_if_empty()
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is None


<rx.disposable.disposable.Disposable at 0x271cd49ab80>

In [46]:
'''sequence_equal:-
This operator will compare two sequences of observables, or an array of values and return an observable with 
the value true or false.'''

from rx import of, operators as op
test = of(1,2,3)
test1 = of(1,2,3)
sub1 = test.pipe(
   op.sequence_equal(test1)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is True


<rx.disposable.disposable.Disposable at 0x271cd4a9070>

In [47]:
#skip_until:- This operator will discard values from the source observable until the second observable emits a value.

from rx import interval,range, operators as op
from datetime import date
test = interval(0)
test1 = range(10)
sub1 = test1.pipe(
   op.skip_until(test)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is 0
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9


<rx.disposable.disposable.Disposable at 0x271cd4a95b0>

In [48]:
#skip_while:=This operator will return an observable with values from the source observable that satisfies the condition passed.

from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.skip_while(lambda x : x < 5)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
The value is 10


<rx.disposable.disposable.Disposable at 0x271cd4768e0>

In [49]:
#take_until:-This operator will discard values from the source observable after the second observable emits
# a value or is terminated.

from rx import timer,range, operators as op
from datetime import date
test = timer(0.01)
test1 = range(50)
sub1 = test1.pipe(
   op.take_until(test)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is 0
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
The value is 10
The value is 11
The value is 12
The value is 13
The value is 14
The value is 15
The value is 16
The value is 17
The value is 18
The value is 19
The value is 20
The value is 21
The value is 22
The value is 23
The value is 24
The value is 25
The value is 26
The value is 27
The value is 28
The value is 29
The value is 30
The value is 31
The value is 32
The value is 33
The value is 34
The value is 35
The value is 36
The value is 37
The value is 38
The value is 39
The value is 40
The value is 41
The value is 42
The value is 43
The value is 44
The value is 45
The value is 46
The value is 47
The value is 48
The value is 49


<rx.disposable.disposable.Disposable at 0x271cd4afe20>

In [50]:
#take_while:- This operator will discard values from the source observable when the condition fails

from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.take_while(lambda a : a < 5)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is 1
The value is 2
The value is 3
The value is 4


<rx.disposable.disposable.Disposable at 0x271cd4af880>

# Connectable Operators

In [51]:
#publish:- This method will convert the observable into a connectable observable

from rx import create, range, operators as op
import random
def test_observable(observer, scheduler):
   observer.on_next(random.random())
   observer.on_completed()
source = create(test_observable).pipe(op.publish())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 – {0}".format(i)))
source.connect()

From subscriber 1 - 0.1053691134284368
From subscriber 2 – 0.1053691134284368


<rx.disposable.compositedisposable.CompositeDisposable at 0x271cd4b77c0>

In [52]:
#ref_count:- This operator will make the observable a normal observable.

from rx import create, operators as op
import random
def test_observable(observer, scheduler):
   observer.on_next(random.random())
source = create(test_observable).pipe(op.publish(),op.ref_count())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))

From subscriber 1 - 0.76410268631334


In [53]:
'''replay:-
This method works similar to the replaySubject. This method will return the same values, 
even if the observable has already emitted, and some of the subscribers are late in subscribing.'''

from rx import create, range, operators as op
import random
from threading import Timer
def test_observable(observer, scheduler):
   observer.on_next(random.random())
   observer.on_completed()
source = create(test_observable).pipe(op.replay())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))
source.connect()
print("subscriber called after delay ")
def last_subscriber():
   test3 = source.subscribe(on_next = lambda i: print("From subscriber 3 - {0}".format(i)))
t = Timer(5.0, last_subscriber)
t.start()

From subscriber 1 - 0.5998407681647847
From subscriber 2 - 0.5998407681647847
subscriber called after delay 


# Combining Operators

In [54]:
#merge:- This operator will merge given observables.

from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6)
test2 = of(11,12,13,14,15,16)
sub1 = test.pipe(
   op.merge(test2)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 11
The value is 12
The value is 13
The value is 14
The value is 15
The value is 16


<rx.disposable.disposable.Disposable at 0x271cd4af970>

In [55]:
#start_with
#This operator will take in the given values, and add at the start of the source observable return back the full sequence.

from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6)
sub1 = test.pipe(
   op.start_with(-2,-1,0)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

The value is -2
The value is -1
The value is 0
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6


<rx.disposable.disposable.Disposable at 0x271cd476520>

In [1]:
#zip:-
#This operator returns an observable with values in a tuple form, 
#which is formed by taking the first value of the given observable and so on.

from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6)
test1 = of(4,8,12,16,20)
test2 = of(5,10,15,20,25)
sub1 = test.pipe(
   op.zip(test1, test2)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))

<rx.disposable.disposable.Disposable at 0x2746671f610>

# Custom Operators

In [57]:
import reactivex
from reactivex import operators as op

def length_more_than_5():
    # In v4 rx.pipe has been renamed to `compose`
    return reactivex.compose(
        op.map(lambda s: len(s)),
        op.filter(lambda i: i >= 5),
    )

reactivex.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
    length_more_than_5()
).subscribe(lambda value: print("Received {0}".format(value)))

Received 5
Received 5
Received 5
Received 7


<reactivex.disposable.disposable.Disposable at 0x271cd4c1400>

In [58]:
import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Value is 30


<rx.disposable.disposable.Disposable at 0x271cd4b1a00>

In [59]:
#convert strings into lowercase
import reactivex
from reactivex import of, operators as op
def convert_lowercase():
    def _lowercase(source):
        def subscribe(observer, scheduler = None):
            def on_next(value):
                observer.on_next(value.lower())

            return source.subscribe(
                on_next,
                observer.on_error,
                observer.on_completed,
                scheduler=scheduler)
        return reactivex.create(subscribe)
    return _lowercase
reactivex.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(convert_lowercase()).subscribe(
    lambda value: print("Recieved {0}".format(value)))

Recieved alpha
Recieved beta
Recieved gamma
Recieved delta
Recieved epsilon


<reactivex.disposable.disposable.Disposable at 0x271cd4c5c40>

# Create a subject

In [60]:
'''A subject is an observable sequence, as well as, an observer that can multicast, i.e.
talk to many observers that have subscribed.'''

from rx.subject import Subject

#Created a subject
subject_test=Subject()

#Subscribe to a Subject
subject_test.subscribe(
  lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
  lambda x: print("The value is {0}".format(x))
)

#Passing Data to Subject
subject_test.on_next("A")
subject_test.on_next("B")
subject_test.on_next("C")

The value is A
The value is A
The value is B
The value is B
The value is C
The value is C


In [61]:
#We can use the on_completed() method, to stop the subject execution as shown below.

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")
#Once we call complete, the next method called later is not invoked.

The value is A
The value is A


In [62]:
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Error : There is an Error!
Error : There is an Error!


# BehaviorSubject

In [63]:
#BehaviorSubject will give you the latest value when called. You can create behavior subject as shown below −

from rx.subject import BehaviorSubject

#initialized the behaviour subject with value:Testing Behaviour Subject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject


# Replay Subject

In [64]:
#A replaysubject is similar to behavior subject, wherein, it can buffer the values and replay the same to the new subscribers.

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

#The buffer value used is 2 on the replay subject. So,
#the last two values will be buffered and used for the new subscribers called.

Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5


# AsyncSubject

In [65]:
'''In the case of AsyncSubject, the last value called is passed to the subscriber, and it will be done only
after the complete() method is called.'''

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))

#Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Testing Async Subject A: 2
Testing Async Subject B: 2


<rx.disposable.disposable.Disposable at 0x271cd4e1580>

# Concurrency Using Scheduler

In [None]:
'''One important feature of RxPy is concurrency, i.e. to allow the task to execute in parallel.
To make that happen, we have two operators subscribe_on() and observe_on() that will work with a scheduler, 
that will decide the execution of the subscribed task.'''

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

#In the above example, I have 2 tasks: Task 1 and Task 2. The execution of the task is in sequence. 
#The second task starts only, when the first task is done.

From Task 1: 1
From Task 1: 2
From Task 1: 3
From subscriber 3 - 0.5998407681647847
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete


In [None]:
'''RxPy supports many Scheduler, and here, we are going to make use of ThreadPoolScheduler. ThreadPoolScheduler mainly will try 
   to manage with the CPU threads available.

   In the example, we have seen earlier, we are going to make use of a multiprocessing module that will give us the cpu_count. 
   The count will be given to the ThreadPoolScheduler that will manage to get the task working in parallel based on the threads
   available.'''


import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

#In the above code, I have 2 tasks and the cpu_count is 4. Since, the task is 2 and threads available with us are 4,
#both the task can start in parallel.

In [None]:
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

# RxPY - Examples

Here, is a very simple example, wherein, I am getting user data from this URL −https://jsonplaceholder.typicode.com/users.
Filtering the data, to give the names starting with "C", and later using the map to return the names only.
Here is the output for the same.

In [None]:
import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

# Difference between observable and subject
In this example, we will see the difference between an observable and a subject. 

In [None]:
from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

In the above example, every time you subscribe to the observable, it will give you new values.

# Subject Example

In [None]:
from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

if you see the values are shared, between both subscribers using the subject.

# Cold Observables

Cold observables, are observable that are executed, and renders data each time it is subscribed. When it is subscribed, the observable is executed and the fresh values are given.

In [3]:
from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

From first subscriber
From sub1 1.6191626954465463
From sub1 2.942122459662457
From sub1 3.830608236467449
From sub1 4.638971436793946
From sub1 5.652319934352232
From second subscriber
From sub2 1.5700378558676036
From sub2 2.1334271572716705
From sub2 3.895498960846743
From sub2 4.574399102972901
From sub2 5.91333073669314


In the above example, every time you subscribe to the observable, it will execute the observable and emit values. The values can also differ from subscriber to subscriber as shown in the example above.

# Hot Observables

In the case of hot observable, they will emit the values when they are ready and will not always wait for a subscription. When the values are emitted, all the subscribers will get the same value.

You can make use of hot observable when you want values to emitted when the observable is ready, or you want to share the same values to all your subscribers.

In [2]:
from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

From sub1 1.2944047229592097
From sub2 1.2944047229592097
From sub1 2.1917827573897743
From sub2 2.1917827573897743
From sub1 3.1145205379650953
From sub2 3.1145205379650953
From sub1 4.905961315404904
From sub2 4.905961315404904
From sub1 5.1676095424576705
From sub2 5.1676095424576705


 If you see, the same value is shared between the subscribers. You can achieve the same using publish () connectable observable operator.

# Testing in RxPy

In [None]:
#basic example
from reactivex.testing import ReactiveTest, TestScheduler
from reactivex import operators

def test_double():
    # Create a scheduler
    scheduler = TestScheduler()
    # Define one or more source
    source = scheduler.create_hot_observable(
        ReactiveTest.on_next(250, 3),
        ReactiveTest.on_next(350, 5),
    )

    # Define how the observable/operator is used on the source
    def create():
        return source.pipe(operators.map(lambda x: 2 * x))

    # trigger subscription and record emissions
    results = scheduler.start(create)

    # check the messages and potentially subscriptions
    assert results.messages == [
        ReactiveTest.on_next(250, 6),
        ReactiveTest.on_next(350, 10),
    ]