# Módulo RxPy

Dentro del módulo **RxPy** encontramos los 3 objetos **Observables**, **Observers** y **Subjects**, y los veremos cada uno por separado

## Observables RxPy

Un **Observable** es una clase de Python que permitira publicar datos que pueden tomar los que esten suscritos a este Observable y esta clase se peude instanciar directamente o crear de una función generadorá.

Un ejemplo de uso sencillo es crear un **Observable** a partir de una lista

In [None]:
import rx

Observable = rx.from_list([2, 3, 5, 7])

## Observe RxPy

El suscriptor u **observer** se genera con el método ``susbcribe()`` que poseen los **Observable** y dicho método acepta 4 parámetros

- ``on_next`` = función que se ejecutará al recibir un nuevo valor
- ``on_complete`` = función que se ejecuta una vez el Observable envia la señal de complete
- ``on_error`` = Función para manejar los errores generados desde el Observable
- ``Observer`` = The object that is to receive notifications. You may subscribe using an Observer or callbacks, not both

podemos usarla con Lambdas functions o funciones normales

In [None]:
import rx


def prime_number_reporter(value):
    print('Function Received', value)

Observable = rx.from_list([2, 3, 5, 7])

# Subscribe a named function
observable.subscribe(prime_number_reporter)
# Subscribe a lambda function
observable.subscribe(lambda value: print('Lambda Received', value))

In [None]:
import rx

Observable = rx.from_list([2, 3, 5, 7])

class PrimeNumberObserver:
    def on_next(self, value):
        print('Object Received', value)

    def on_completed(self):
        print('Data Stream Completed')

    def on_error(self, error):
        print('Error Occurred', error)

# Subscribe an Observer object
observable.subscribe(PrimeNumberObserver())

# Use lambdas to set up all three functions
observable.subscribe(
    on_next = lambda value: print('Received on_next', value),
    on_error = lambda exp: print('Error Occurred', exp),
    on_completed = lambda: print('Received completed notification')
)

## Multiples suscriptores/observers

Como nombramos una de las principales funcionalidades de la programación reactiva es que podemos tener multiples observers suscritos a un solo observable y cada uno actuando por serparado

In [None]:
import rx

# Create an observable using data in a list
observable = rx.from_list([2, 3, 5, 7])

class PrimeNumberObserver:
    """ An Observer class """
    def on_next(self, value):
        print('Object Received', value)

    def on_completed(self):
        print('Data Stream Completed')

    def on_error(self, error):
        print('Error Occurred', error)

    def prime_number_reporter(value):
        print('Function Received', value)


print('Set up Observers / Subscribers')

# Subscribe a lambda function
observable.subscribe(lambda value: print('Lambda Received',value))

# Subscribe a named function
observable.subscribe(prime_number_reporter)

# Subscribe an Observer object
observable.subscribe(PrimeNumberObserver())

# Use lambdas to set up all three functions
observable.subscribe(
    on_next=lambda value: print('Received on_next', value),
    on_error=lambda exp: print('Error Occurred', exp),
    on_completed=lambda: print('Received completed notification')
)

# Subjects RxPy

Los subjects actuan como Obersevable y Observe al mismo tiempo, es decir que ademas de recibir información de un observable, es decir al estar suscrito, este transformara los datos y los publciara como si fuera un Observable como tal

In [None]:
import rx
from rx.subjects import Subject
from datetime import datetime


source = rx.from_list([2, 3, 5, 7])


class TimeStampSubject(Subject):
    def on_next(self, value):
        print('Subject Received', value)
        super().on_next((value, datetime.now()))

    def on_completed(self):
        print('Data Stream Completed')
        super().on_completed()

    def on_error(self, error):
        print('In Subject - Error Occurred', error)
        super().on_error(error)

def prime_number_reporter(value):
    print('Function Received', value)

print('Set up')

# Create the Subject
subject = TimeStampSubject()

# Set up multiple subscribers for the subject
subject.subscribe(prime_number_reporter)
subject.subscribe(lambda value: print('Lambda Received', value))
subject.subscribe(
    on_next = lambda value: print('Received on_next', value),
    on_error = lambda exp: print('Error Occurred', exp),
    on_completed = lambda: print('Received completed notification')
)

# Subscribe the Subject to the Observable source
source.subscribe(subject)
print('Done')

# Concurrencia en RxPy


