In [1]:
# Apply a transformation to a data source to convert
# integers into strings
import rx
from rx import operators as op
# Set up a source with a map function
source = rx.from_list([2, 3, 5, 7]).pipe(op.map(lambda value: "'" + str(value) + "'"))
# Subscribe a lambda function
source.subscribe(lambda value: print('Lambda Received', value, ' is a string ', isinstance(value, str)))

Lambda Received '2'  is a string  True
Lambda Received '3'  is a string  True
Lambda Received '5'  is a string  True
Lambda Received '7'  is a string  True


<rx.disposable.disposable.Disposable at 0x7fd0c051bb00>

In [3]:
# An example illustrating how to merge two data sources
import rx
# Set up two sources
source1 = rx.from_list([2, 3, 5, 7])
source2 = rx.from_list([10, 11, 12])
# Merge two sources into one
rx.merge(source1, source2).subscribe(lambda v: print(v, end=','))

2,3,5,7,10,11,12,

<rx.disposable.disposable.Disposable at 0x7fd0c04ce748>

In [4]:
# Filter source for even numbers
import rx
from rx import operators as op
# Set up a source with a filter
source = rx.from_list([2, 3, 5, 7, 4, 9, 8]).pipe(op.filter(lambda value: value % 2 == 0))
# Subscribe a lambda function
source.subscribe(lambda value: print('Lambda Received', value))

Lambda Received 2
Lambda Received 4
Lambda Received 8


<rx.disposable.disposable.Disposable at 0x7fd0c04cea58>

In [5]:
# Use distinct to suppress duplicates
source = rx.from_list([2, 3, 5, 2, 4, 3, 2]).pipe(op.distinct())
# Subscribe a lambda function
source.subscribe(lambda value: print('Received', value))

Received 2
Received 3
Received 5
Received 4


<rx.disposable.disposable.Disposable at 0x7fd0c04ce860>

In [6]:
# Example of summing all the values in a data stream
import rx
from rx import operators as op
# Set up a source and apply sum
rx.from_list([2, 3, 5, 7]).pipe(op.sum()).subscribe(lambda v: print(v))

17


<rx.disposable.disposable.Disposable at 0x7fd0c04dd780>

In [7]:
import rx
from rx import operators as op
# Rolling or incremental sum
rx.from_([2, 3, 5, 7]).pipe(op.scan(lambda subtotal, i: subtotal+i)).subscribe(lambda v: print(v))

2
5
10
17


<rx.disposable.disposable.Disposable at 0x7fd0c04dd860>

In [8]:
# Example of chaining operators together
import rx
from rx import operators as op
# Set up a source with a filter
source = rx.from_list([2, 3, 5, 7, 4, 9, 8])
pipe = source.pipe(op.filter(lambda value: value % 2 == 0), op.map(lambda value: "'" + str(value) + "'"))
# Subscribe a lambda function
pipe.subscribe(lambda value: print('Received', value))

Received '2'
Received '4'
Received '8'


<rx.disposable.disposable.Disposable at 0x7fd0c04e8240>

In [1]:
stocks = (('APPL', 12.45), ('IBM', 15.55), ('MSFT', 5.66), ('APPL', 13.33))

In [5]:
import rx
from rx import operators as op
# Set up a source with a filter
pipe = source.pipe(op.filter(lambda value: value[0] == 'APPL'), op.map(lambda value: str(value)))
pipe.subscribe(lambda value: print('Received', value))

Received ('APPL', 12.45)
Received ('APPL', 13.33)


<rx.disposable.disposable.Disposable at 0x7f95d8542390>

In [6]:
import rx
from rx import operators as op
# Set up a source with a filter
pipe = source.pipe(op.filter(lambda value: value[1] > 15.00), op.map(lambda value: str(value)))
pipe.subscribe(lambda value: print('Received', value))

Received ('IBM', 15.55)


<rx.disposable.disposable.Disposable at 0x7f95d848f128>

In [11]:
import rx
from rx import operators as op
# Set up a source with a filter
pipe = source.pipe(op.filter(lambda value: value[0] == 'APPL'), op.sum(lambda value: value[1]))
pipe.subscribe(lambda value: print('Received', value/2))

Received 12.89


<rx.disposable.disposable.Disposable at 0x7f95d84acba8>

In [2]:
stocks2 = (('GOOG', 8.95), ('APPL', 7.65), ('APPL', 12.45), ('MSFT', 5.66), ('GOOG', 7.56), ('IBM', 12.76))

In [3]:
import rx
# Set up two sources
source1 = rx.from_list(stocks)
source2 = rx.from_list(stocks2)
# Merge two sources into one
sourceall = rx.merge(source1, source2)

In [32]:
import rx
from rx import operators as op
# Set up a source with a filter
pipe = sourceall.pipe(op.reduce(lambda value, y: value if(value[1] > y[1]) else y),
                      op.map(lambda value: str(value)))
pipe.subscribe(lambda value: print('Received', value))

Received ('IBM', 15.55)


<rx.disposable.disposable.Disposable at 0x7f95da93b860>

In [33]:
import rx
from rx import operators as op
# Set up a source with a filter
pipe = sourceall.pipe(op.reduce(lambda value, y: value if(value[1] < y[1]) else y),
                      op.map(lambda value: str(value)))
pipe.subscribe(lambda value: print('Received', value))

Received ('MSFT', 5.66)


<rx.disposable.disposable.Disposable at 0x7f95d85e0748>

In [6]:
import rx
from rx import operators as op
# Set up a source with a filter
pipe = sourceall.pipe(op.distinct(lambda value: value))
pipe.subscribe(lambda value: print('Received', value))

Received ('APPL', 12.45)
Received ('IBM', 15.55)
Received ('MSFT', 5.66)
Received ('APPL', 13.33)
Received ('GOOG', 8.95)
Received ('APPL', 7.65)
Received ('GOOG', 7.56)
Received ('IBM', 12.76)


<rx.disposable.disposable.Disposable at 0x7f44d0158e10>