In [2]:
!pip install rx

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting rx
  Downloading Rx-3.2.0-py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.2/199.2 KB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rx
Successfully installed rx-3.2.0


In [10]:
# Parte I

import rx
import rx.operators as ops

In [24]:
source = rx.from_iterable([1,2,3,4,5,6,7])
# source = rx.from_iterable([1,2,3,4,5,'gs',7])

disposable = source.pipe(
    ops.map(lambda i:i-1),
    ops.filter(lambda i:i%2 == 0),
).subscribe(
    on_next=lambda i: print(f'on_next: {i}'),
    on_completed=lambda: print('on_completed'),
    on_error=lambda e: print(f'on_error: {e}'),
)

disposable.dispose()
print('Finalizado')

on_next: 0
on_next: 2
on_next: 4
on_next: 6
on_completed
Finalizado


In [25]:
# Parte II

from rx import create, disposable

In [26]:
# 1º Forma

# Função que recebe os Observer 
def push_five_strings(observer, scheduler):
    observer.on_next ("Alfa")
    observer.on_next("Beta")
    observer.on_next ("Gama")
    observer.on_next("Delta")
    observer.on_next("Epsilon")
    observer.on_completed()

class PrintObserver(disposable.Disposable):

    def on_next(self, value):
        print(f"Recebido {value}")

    def on_completed(self):
        print("Fim!")

    def on_error(self, error):
        print(f'Erro identificado: {error}')

#cria o observable
source = create(push_five_strings)

#define 0 observable
source.subscribe(PrintObserver())

Recebido Alfa
Recebido Beta
Recebido Gama
Recebido Delta
Recebido Epsilon
Fim!


<rx.disposable.disposable.Disposable at 0x7f794b17f7c0>

In [27]:
# 2º Forma

from rx import of
 
source = of("Alfa", "Beta", "Gama", "Delta", "Epsilon")

source.subscribe(
    on_next= lambda i: print(f"Rebebido {i}"),
    on_completed= lambda: print ("Finalizado"),
    on_error= lambda e:print(f"Erro identificado: {e}")
)

Rebebido Alfa
Rebebido Beta
Rebebido Gama
Rebebido Delta
Rebebido Epsilon
Finalizado


<rx.disposable.disposable.Disposable at 0x7f794b167610>

In [31]:
# 1º Forma

# Função de compra de ações
stocks = [
    {'TCKR' : 'APPL', 'PRICE': 200},
    {'TCKR' : 'GOOG', 'PRICE': 90},
    {'TCKR' :'TSLA', 'PRICE': 120},
    {'TCKR' :'MSFT', 'PRICE': 150},
    {'TCKR' : 'INTL', 'PRICE': 70},
]

def buy_stock_events(observer, scheduler):
    for stock in stocks:
        if stock["PRICE"] > 100:
            observer.on_next(stock["TCKR"])
            
    observer.on_completed()

class StockObserver(disposable.Disposable):

    def on_next(self, value):
        print(f"Instruções recebidas para comprar ação {value}")

    def on_completed(self):
        print("Todas as ordens de compras foram finalizadas")

    def on_error(self, error):
        print(f"Erro identificado: {error}")

#cria o observable
source = create(buy_stock_events)

#define 0 observable
source.subscribe(StockObserver())

Instruções recebidas para comprar ação APPL
Instruções recebidas para comprar ação TSLA
Instruções recebidas para comprar ação MSFT
Todas as ordens de compras foram finalizadas


<rx.disposable.disposable.Disposable at 0x7f794b1679d0>

In [32]:
# 2º Forma

source = create(buy_stock_events)
source.subscribe(
    on_next = lambda value: print(f"Instruções recebidas para comprar açao {value}"),
    on_completed = lambda: print("Instrução de compra finalizada"),
    on_error = lambda e: print(e)
)

Instruções recebidas para comprar açao APPL
Instruções recebidas para comprar açao TSLA
Instruções recebidas para comprar açao MSFT
Instrução de compra finalizada


<rx.disposable.disposable.Disposable at 0x7f794b167910>

In [37]:
# 1º Forma

# Verifica tamanho das strings

from rx import of, operators as op

of("alpha","Beta","Gamma"," Delta"," Epslon").pipe(
    op.map(lambda s: len(s)),
    op.filter(lambda i: i >= 5)
).subscribe(lambda value: print(f"Recebido {value}"))

Recebido 5
Recebido 5
Recebido 6
Recebido 7


<rx.disposable.disposable.Disposable at 0x7f794b177040>

In [38]:
# 2º Forma

from rx import of, operators as op

source = of("alpha","Beta","Gamma"," Delta"," Epslon")

composed = source.pipe(
    op.map(lambda s: len(s)),
    op.filter(lambda i: i >= 5)
)

composed.subscribe(lambda value: print(f"Recebido {value}"))

Recebido 5
Recebido 5
Recebido 6
Recebido 7


<rx.disposable.disposable.Disposable at 0x7f794b17fbb0>

In [39]:
# 3º Forma (através de uma função)

import rx
from rx import operators as ops

def lenght_more_than_5():
    return rx.pipe(
       op.map(lambda s: len(s)),
        op.filter(lambda i: i >= 5)
    )

rx.of("alpha","Beta","Gamma"," Delta"," Epslon").pipe(
    lenght_more_than_5()
).subscribe(lambda value: print(f"Recebido {value}"))

Recebido 5
Recebido 5
Recebido 6
Recebido 7


<rx.disposable.disposable.Disposable at 0x7f794b148e50>