## TEMA 2: PROGRAMACIÓN REACTIVA

In [1]:
!pip install rx

Collecting rx
  Downloading Rx-3.2.0-py3-none-any.whl.metadata (4.6 kB)
Downloading Rx-3.2.0-py3-none-any.whl (199 kB)
Installing collected packages: rx
Successfully installed rx-3.2.0


In [4]:
# Programación funcional

from rx import create

def generador(observer, scheduler):
    # Si las funciones on_next, on_error, etc no están suscritas al observador, da error
    observer.on_next("Hola")
    # Después de ejecutar on_error, los observers no ejecutan nada después
    # observer.on_error("Error")
    observer.on_completed()

observable = create(generador) # Se le pasa la funcion
observable.subscribe(on_next = lambda s: print(f"Recibido: {s}"), # Se subscriben los observadores
                     on_error = lambda e: print(f"Mensaje de error: {e}"),
                     on_completed = lambda: print("Completado")) 

observable.subscribe(on_next = lambda s: print(f"Recibido2: {s}"), # Se subscriben los observadores
                     on_error = lambda e: print(f"Mensaje de error: {e}")) 

Recibido: Hola
Completado
Recibido2: Hola


<rx.disposable.disposable.Disposable at 0x1cd49dabc20>

In [6]:
# Programación funcional

from rx import from_

observable = from_([1, 2, 3]) # Se le pasa la funcion

observable.subscribe(on_next = lambda s: print(f"Recibido: {s}"), # Se subscriben los observadores
                     on_error = lambda e: print(f"Mensaje de error: {e}"),
                     on_completed = lambda: print("Completado")) 


Recibido: 1
Recibido: 2
Recibido: 3
Completado


<rx.disposable.disposable.Disposable at 0x1cd49cf3ce0>

In [3]:
from rx import create
from rx.core import Observer

class Printer(Observer):
    def on_next(self, value):
        print(f"recibiendo: {value}")

    def on_error(self, error):
        print(f"Mensaje de error: {error}")

    def on_completed(self):
        print("acabé")


def observer_teclado(observer, scheduler):
    exit_=False
    while not exit_:
        msg = input("Introduce un mensaje: ")
        if msg:
            if msg == "exit":
                exit_=True
                observer.on_completed()
            else:
                observer.on_next(msg)
        else:
            observer.onerror("mensaje vacío")
            exit_=True

observable = create(observer_teclado)

printer = Printer()

observable.subscribe(printer)

recibiendo: hola
Mensaje de error: 'AutoDetachObserver' object has no attribute 'onerror'


<rx.disposable.disposable.Disposable at 0x1c55861ae40>

In [7]:
from rx import create, operators, from_
from rx.core import Observer

class Printer(Observer):
    def on_next(self, value):
        print(f"recibiendo: {value}")

    def on_error(self, error):
        print(f"Mensaje de error: {error}")

    def on_completed(self):
        print("acabé")


def observer_teclado(observer, scheduler):
    exit_=False
    while not exit_:
        msg = input("Introduce un mensaje: ")
        if msg:
            if msg == "exit":
                exit_=True
                observer.on_completed()
            else:
                observer.on_next(msg)
        else:
            observer.onerror("mensaje vacío")
            exit_=True

observable = create(observer_teclado)

observable_intermedio = observable.pipe(operators.map(lambda s: s.upper()), # Son secuenciales
                                        operators.map(lambda s: f"Convertido a mayusculas: {s}"))

printer = Printer()

observable_intermedio.subscribe(printer)




recibiendo: Convertido a mayusculas: ASDF
Mensaje de error: 'AutoDetachObserver' object has no attribute 'onerror'


<rx.disposable.disposable.Disposable at 0x1c55872ce90>

In [10]:
def num_generator(observer, scheduler):
    l = [123,654,24,905756,3453,45421]
    for n in l:
        observer.on_next(n)

    observer.on_complete() # para el máximo

observable = from_([123,3645,7845,23459,6787324,])
observable.pipe(operators.filter(lambda n: n>20000)).subscribe(on_next=lambda s: print(s), on_completed=lambda: print("Completado"))
observable.pipe(operators.max()).subscribe(on_next=lambda s: print(s), on_completed=lambda: print("Completado"))

6787324
Completado


<rx.disposable.disposable.Disposable at 0x1c5587626f0>

In [5]:
# Ejercicio 1
# Lanzamos una peticion de busqueda para buscar las peliculas que encajen con un titulo.
# Procesaremos el JSON de respuesta mediante un Obsevable y operaciones:
    # Eliminaremos todos los resultados que no tengan type == "movie"
    # Generaremos el mensaje: (ID) - TITULO: URL_POSTER (AÑO)

import requests
from rx import from_, create, operators
from rx.core import Observer
from secret import api_key
from requests import get

class Printer(Observer):
    def on_next(self, value):
        print(f"recibiendo: {value}")

    def on_error(self, error):
        print(f"Mensaje de error: {error}")

    def on_completed(self):
        print("acabé")

def search_movies(observer, scheduler):
    # &s es para buscar por una palabra, y &t para un titulo
    response = get(f"http://www.omdbapi.com/?apikey={api_key}&s=cars")

    if response.status_code == 200:
        for i in response.json()["Search"]:
            observer.on_next(i)

observable = create(search_movies)

observable.pipe(operators.filter(lambda s: s["Type"] == "movie"),
                operators.map(lambda d: f"({d["imdbID"]}) - ({d["Title"]}) : ({d["Poster"]}) ({d["Year"]})")).subscribe(Printer())




recibiendo: (tt0317219) - (Cars) : (https://m.media-amazon.com/images/M/MV5BMTg5NzY0MzA2MV5BMl5BanBnXkFtZTYwNDc3NTc2._V1_SX300.jpg) (2006)
recibiendo: (tt1216475) - (Cars 2) : (https://m.media-amazon.com/images/M/MV5BMTUzNTc3MTU3M15BMl5BanBnXkFtZTcwMzIxNTc3NA@@._V1_SX300.jpg) (2011)
recibiendo: (tt3606752) - (Cars 3) : (https://m.media-amazon.com/images/M/MV5BMTc0NzU2OTYyN15BMl5BanBnXkFtZTgwMTkwOTg2MTI@._V1_SX300.jpg) (2017)
recibiendo: (tt0200027) - (Riding in Cars with Boys) : (https://m.media-amazon.com/images/M/MV5BZmQ3MjM3OTAtNDUwYy00NWM3LTljM2ItN2JhZTRlMDM3ZTcxXkEyXkFqcGc@._V1_SX300.jpg) (2001)
recibiendo: (tt0081698) - (Used Cars) : (https://m.media-amazon.com/images/M/MV5BNTY3Zjg2ODQtNjdhNS00OWY1LThkODEtYWYyNDliNzI3ZTlhXkEyXkFqcGc@._V1_SX300.jpg) (1980)
recibiendo: (tt1282139) - (Cars of the Revolution) : (https://m.media-amazon.com/images/M/MV5BZmE3NGIzZmEtZTBhNi00ZDYzLWJmZjItZWRmMmYzYzVmNTViXkEyXkFqcGdeQXVyMjExNjgyMTc@._V1_SX300.jpg) (2008)
recibiendo: (tt0246692) - (Old Men 

<rx.disposable.disposable.Disposable at 0x2855ae189b0>

### TKInter

In [26]:
from tkinter import Tk, Label, Button, Entry, Checkbutton, BooleanVar, StringVar
from tkinter.ttk import Combobox

class Window:
    def __init__(self):
        self.window = Tk() # Crear ventana con interfaz gráfica
        #self.window.geometry("200x200") # añadir x+y para la posición

        self.label = Label(text="Hola mundo", font=("Arial Bold", 25)) # Crear etiqueta
        self.label.grid(row=0, column=0) # Añadir la etiqueta a la ventana
        # label.pack() # Añadir la etiqueta a la ventana (sin posición)

        self.label2Text = StringVar()
        self.label2Text.set("caracola")

        self.label2 = Label(font=("Arial", 10), textvariable=self.label2Text) # Crear etiqueta
        self.label2.grid(row=0, column=1, padx=10) # Añadir la etiqueta a la ventana

        self.button = Button(text="Pulsa", command=self.read_entry) # Crear botón con funcionalidad
        self.button.grid(row=1, column=0) # Añadir el botón a la ventana

        self.entry = Entry() # Crear un campo de texto
        self.entry.grid(row=1, column=1)

        self.combobox = Combobox(values=["Opción 1", "Opción 2", "Opción 3", "Default"])
        self.combobox.grid(row=2, column=0)
        self.combobox.current(3)

        self.checkButtonState = BooleanVar()
        self.checkButtonState.set(True)

        self.checkbutton = Checkbutton(text="yes or not", variable=self.checkButtonState)
        self.checkbutton.grid(row=2, column=1)
        

        self.window.mainloop() # Bucle infinito para que la ventana no se cierre

    def read_entry(self): # imprime el contenido escrito en el entry
        self.label2Text.set(self.entry.get())
        self.checkButtonState.set(not self.checkButtonState.get())

Window()


<__main__.Window at 0x2523beb9fa0>

In [None]:
# Ejercicio 2: calculadora

from tkinter import Tk, Label, Button
from functools import partial

class Calc:
    def __init__(self):
        self.keyboard_width = 4
        self.keyboard_height = 4

        self.window = Tk()
        self.window.title("Calculadora")

        self.display = Label(text="0", font=("Arial Bold", 30))
        self.display.grid(row=0, column=0, columnspan=self.keyboard_width)

        lista_simbolos = ["7", "8", "9", "+", "4", "5", "6", "-", "1", "2", "3", "*", "0", "=", "/"]
        i = 0
        for sym in lista_simbolos:
            button = Button(text=sym, font=("Arial Bold", 20), command=partial(self.buttonPressed, sym))
            if sym == "=":
                button.grid(
                    column=1 % self.keyboard_width, 
                    row=int(i/self.keyboard_height)+1,
                    padx=10, pady=10, columnspan=2)
            else:
                button.grid(
                    column=i % self.keyboard_width, 
                    row=int(i/self.keyboard_height)+1,
                    padx=10, pady=10)
            i += 1


        self.window.mainloop()

    def buttonPressed(self, btn_data):
        current_text = str(self.display.cget("text"))
        if current_text == "0":
            current_text = ""
        
        if btn_data == "=":
            self.display.configure(text=eval(current_text))
        else:
            self.display.configure(text=current_text+btn_data)


Calc()

<__main__.Calc at 0x25237536780>

In [16]:
# RX y Tkinter
!pip install kafka-python-ng

Collecting kafka-python-ng
  Downloading kafka_python_ng-2.2.3-py2.py3-none-any.whl.metadata (9.7 kB)
Downloading kafka_python_ng-2.2.3-py2.py3-none-any.whl (232 kB)
Installing collected packages: kafka-python-ng
Successfully installed kafka-python-ng-2.2.3


In [1]:
# Ejercicio: implementar, utilizando una interfaz grafica de TKInter y reactivex, un
# programa que se dedique a escuchar el Topic Kafka “PAPR_MAIS”, “PAPR_INSOA”
# o “PAPR_INSOBC” según el grupo que os corresponda. El programa tendrá un
# Observable que incluirá el KafkaConsumer y se encargará de leer los mensajes.
# Cada vez que se reciba uno, lo emitiremos a los obsevers, que se encargarán de
# actualizar la interfaz gráfica y de imprimir el mensaje por terminal

from tkinter import Tk, Label, Button, Text, END
from kafka import KafkaConsumer
import threading

class KafkaListener:
    def __init__(self, topic):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers='pkc-l6wr6.europe-west2.gcp.confluent.cloud:9092',
            security_protocol='SASL_SSL',
            sasl_mechanism='PLAIN',
            sasl_plain_username='WKIAB3JMF7ZJZ26U',
            sasl_plain_password='4sQiwHFUoQDAr7BJufwWqHz99/K0R4o4iSx4Phf00rn5PQfa0TfvAwyGP/6tVY9c',
            auto_offset_reset='latest'
            )

    def start_listening(self, onMsg):
        for msg in self.consumer:
            onMsg(msg.value.decode("utf-8"))

class KafkaUI:
    def __init__(self):
        self.window = Tk()
        self.window.title("Kafka UI")

        self.entry = Entry()
        self.entry.grid(row=0, column=0)

        self.button = Button(text="Escuchar", command=self.start_kafka_listening)
        self.button.grid(row=0, column=1)

        self.text = Text()
        self.text.grid(row=1, column=0, columnspan=2)

        self.window.mainloop()

    def start_kafka_listening(self):
        observable = create(self.observable_kafka)
        observable.subscribe(on_next=lambda s: self.update_ui_text(s))

    def update_ui_text(self, data):
        self.text.insert(END, "\n" + str(data))

    def observable_kafka(self, observer, scheduler):
        topic = self.entry.get()
        listener = KafkaListener(topic)
        thread = threading.Thread(target=listener.start_listening, args=(observer.on_next,))
        thread.start()

KafkaUI()

NameError: name 'Entry' is not defined

### Programación asíncrona

In [5]:
import threading
import time

def print_msg(num):
    #time.sleep(2)
    print(f"I am a Thread! {num}")

thread = threading.Thread(target=print_msg, args=(1,))
thread.start() # iniciar el thread

thread.join() # esperar a que el thread termine para continuar con el programa
print("Thread terminado")

I am a Thread! 1
Thread terminado


In [None]:
import threading
import time

def print_msg(num):
    time.sleep(2)
    print(f"I am a Thread! {num}")

threads = []
for i in range(5):
    thread = threading.Thread(target=print_msg, args=(i,))
    thread.start() # iniciar el thread
    threads.append(thread)

for thread in threads:
    thread.join() # esperar a que el thread termine para continuar con el programa

print("Thread terminado")

I am a Thread! 0
I am a Thread! 2
I am a Thread! 4
I am a Thread! 1
I am a Thread! 3
Thread terminado


In [3]:
# Semaforos y locks
import threading
import time

#sem = threading.Semaphore(2) # 2 threads pueden acceder a la vez
lock = threading.Lock() # 1 semáforo de 1 thread es un lock

file_name = "log2.txt"

def access_resource(num):
    lock.acquire() # adquiere el semaforo
    for i in range (5):    
        with open(file_name, "a") as f:
            f.write(f"Thread {num} --> iter {i}\n")

    lock.release() # libera el semaforo

threads = []
for i in range(5):
    thread = threading.Thread(target=access_resource, args=(i,))
    thread.start() # iniciar el thread
    threads.append(thread)

for thread in threads:
    thread.join() # esperar a que el thread termine para continuar con el programa

print("Thread terminado")

Thread terminado


In [6]:
# multiprocesos

def countdown(n):
    while n > 0:
        n -= 1

count = 500000000
countdown(count)

In [None]:
# Como ambos threads estan en el mismo proceso, comparten la misma memoria y tardan más

thread1 = threading.Thread(target=countdown, args=(count/2,))
thread2 = threading.Thread(target=countdown, args=(count/2,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()

In [None]:
# Ejercicio
from tkinter import Button, Label, Tk
from tkinter.ttk import Progressbar

def progreso():
    for i in range(100):
        barra_progreso["value"] = i
        window.update_idletasks()
        time.sleep(0.1)


window = Tk()
window.title("Progreso")
window.geometry("300x200")

label = Label(text="Descargando...")
label.grid(row=0, column=0, padx=25, pady=10)

button = Button(text="Pulsa", command=thread.start)
button.grid(row=2, column=0, padx=25, pady=10)

thread = threading.Thread(target=progreso)

barra_progreso = Progressbar(orient="horizontal", length=250, mode="determinate")
barra_progreso.grid(row=1, column=0, padx=25, pady=10)



window.mainloop()


### AsyncIO

In [1]:
!pip install nest_asyncio



In [2]:
import nest_asyncio
nest_asyncio.apply()

In [None]:
# Corrutinas 

import asyncio, time

async def count():
    print("One")
    await asyncio.sleep(5)
    print("Two")

# Corrutina principal
async def main():
    task = asyncio.create_task(count())
    task2 = asyncio.create_task(count())
    await asyncio.gather(task, task2) # Espera a que todas las tareas terminen
    # await asyncio.gather(count(), count()) # si se le pasa una corrutina, crea la tarea, la ejecuta y espera a que termine

# Ejecutar la corrutina principal
t = time.monotonic()
asyncio.run(main())
print(f"running time: {time.monotonic()-t}")

One
One
Two
Two
One
One
Two
Two
running time: 10.01500000001397


In [None]:
# Ejercicio 1
# Arrancar 3 tareas que generan numeros aleatorios hasta conseguir que genere uno que supere un umbral
# Cada tarea puede tener un umbral diferente
# Despues de generar un numero por debajo del umbral, dormiremos 0.5 segundos

import asyncio, time, random

async def generar_random(umbral):
    
    while True:
        num = random.randint(0, 100)
        if num <= umbral:
            await asyncio.sleep(0.5)
        else:
            print(f"Numero generado: {num} \t Umbral: {umbral}")
            break
        


async def main():
    await asyncio.gather(generar_random(80), generar_random(35), generar_random(67))
    # await asyncio.gather(*(generar_random(elem) for elem in [80, 35, 67])) # otra manera

t = time.monotonic()
asyncio.run(main())
print(f"running time: {time.monotonic()-t}")

Numero generado: 97 	 Umbral: 67
Numero generado: 82 	 Umbral: 35
Numero generado: 97 	 Umbral: 80
running time: 6.077999999979511


In [17]:
import asyncio, time

async def count():
    print("One")
    await count2()
    print("Two")

async def count2():
    print("Three")
    await asyncio.sleep(5)
    print("Four")

# Corrutina principal
async def main():
    await asyncio.gather(count()) # Espera a que todas las tareas terminen
    # await asyncio.gather(count(), count()) # si se le pasa una corrutina, crea la tarea, la ejecuta y espera a que termine

# Ejecutar la corrutina principal
t = time.monotonic()
asyncio.run(main())
print(f"running time: {time.monotonic()-t}")

One
Three
Four
Two
running time: 5.0


In [None]:
# Ejercicio 2

async def add_extra(extra):
    print(f"Extra {extra} añadido")
    await asyncio.sleep(1)

async def cocinar_hamburguesa(carne):
    print(f"Cocinando hamburguesa de {carne}")
    await asyncio.sleep(3)
    print(f"Hamburguesa de {carne} cocinada")

async def procesar_pedido(pedido: dict):
    await cocinar_hamburguesa(pedido["carne"])
    await add_extra(pedido["extra"])
    print(f"Pedido con carne {pedido["carne"]} y extra {pedido["extra"]} terminado")

async def generar_pedidos():
    pedidos = [
        {"carne": "pollo",
         "extra": "queso"},
        {"carne": "cerdo",
        "extra": "lechuga"},
        {"carne": "ternera",
        "extra": "tomate"},
    ]

    tareas = []

    for pedido in pedidos:
        task = asyncio.create_task(procesar_pedido(pedido))
        tareas.append(task)

    await asyncio.gather(*tareas)

async def main():
    await generar_pedidos()

asyncio.run(main())

Cocinando hamburguesa de pollo
Cocinando hamburguesa de pollo
Cocinando hamburguesa de pollo
Hamburguesa de pollo cocinada
Extra queso añadido
Hamburguesa de pollo cocinada
Extra queso añadido
Hamburguesa de pollo cocinada
Extra queso añadido
Pedido con carne pollo y extra queso terminado
Pedido con carne pollo y extra queso terminado
Pedido con carne pollo y extra queso terminado
