# MBIT School

## Master Big Data Cloud y Analytics

---

## Módulo 2: Ecosistema Spark
## Submódulo DC02: Ecosistema Spark

---

### Taller Spark + Kafka: Gestor de cuentas (multiple topic/consumidor)

#### Jacinto Arias (arias.jacinto@gmail.com)

---


En esta libreta implementaremos un proceso consumidor de kafka que procesará los mensajes del stream y mantendrá actualizada el estado de una "supuesta base de datos" con los totales de la cuenta conforme hayamos procesado los mensajes.

En esta libreta ejecutaremos dos consumidores, uno para actualizar las cuentas de los usuarios y otro para mostrar las alertas de las transacciones fallidas.

#### Imports

In [1]:
# python
from collections import defaultdict
import json
import threading

In [2]:
# jupyter
import ipywidgets as widgets
from IPython.display import display, clear_output

In [3]:
# kafka

from confluent_kafka import Producer, Consumer

#### Constantes

In [4]:
KAFKA_BOOTSTRAP_SERVERS = "34.246.163.140:9092"

In [5]:
KAFKA_CONSUMER_GROUP = "fake_bank.account_manager"
KAFKA_TOPIC_ALERTS = "fake_bank.alerts"
KAFKA_TOPIC_TRANSACTIONS_VALIDATED = "fake_bank.transactions_validated"

#### Hilo Consumidor 

In [6]:
class ConsumerTransactionsThread(threading.Thread):
    
    def __init__(self, out):
        super(ConsumerTransactionsThread, self).__init__()
        
        self.out = out
        
        self.kafka_consumer = Consumer({
            'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,
            'group.id': KAFKA_CONSUMER_GROUP
        })
            
        self.account_db = defaultdict(int)

        self.kafka_consumer.subscribe([KAFKA_TOPIC_TRANSACTIONS_VALIDATED])

        self.active = True
        
        
    def run(self):
        while self.active:

            transaction = self.kafka_consumer.poll(1.0)

            if transaction is None:
                continue

            payload = transaction.value().decode('utf-8')
            data = json.loads(payload)

            self.account_db[data["account"]] += data["amount"]        
            self.out.clear_output()
            self.out.append_display_data(self.account_db)

        self.kafka_consumer.close()
        
        
    def stop(self):
        self.active = False
        

In [7]:
class ConsumerAlertsThread(threading.Thread):
    
    def __init__(self, out):
        super(ConsumerAlertsThread, self).__init__()
        
        self.out = out
        
        self.kafka_consumer = Consumer({
            'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,
            'group.id': KAFKA_CONSUMER_GROUP
        })

        self.kafka_consumer.subscribe([KAFKA_TOPIC_ALERTS])

        self.active = True
        
        
    def run(self):
        while self.active:

            alert = self.kafka_consumer.poll(1.0)

            if alert is None:
                continue

            payload = alert.value().decode('utf-8')
            data = json.loads(payload)
       
            self.out.append_display_data(f"Alert for account {data['account']}")

        self.kafka_consumer.close()
        
        
    def stop(self):
        self.active = False
        

#### Creacion de hilos y salidas

In [8]:
out_transactions = widgets.Output()
out_alerts = widgets.Output()

consumer_transactions = ConsumerTransactionsThread(out_transactions)
consumer_transactions.start()

consumer_alerts = ConsumerAlertsThread(out_alerts)
consumer_alerts.start()

In [14]:
display(out_transactions)

Output()

In [16]:
display(out_alerts)

Output()

#### Importante apagar los hilos!!!

In [15]:
consumer_transactions.stop()
consumer_alerts.stop()