# SISTEMAS DE BIG DATA - Examen 1ª Evaluación

**Instrucciones generales**

1.	Todas las sentencias deben ejecutarse desde la línea de comandos en las celdas que hay después del enunciado. No debes realizar ninguna tarea desde fuera de Jupyter.
2.	Puedes **añadir** todas las celdas que necesites siempre y cuando estén antes del siguiente enunciado.
3.	Todas las celdas **deben estar ejecutadas** y debe visualizarse el resultado de salida.
4.	**No es necesario documentar** las respuestas, simplemente debes hacer lo que se pide en el enunciado.
5.	Después de cada parte debes insertar una **captura de pantalla** del cliente gráfico de la base de datos correspondientes donde se vea que los datos se han cargado correctamente.
6.	Debes entregar tanto el **notebook** (fichero `.ipynb`) como el mismo fichero convertido a **PDF** (es muy probable que si intentas convertirlo en el propio contenedor te falle por no tener instalado `pandoc`, si es así descargalo en formato `.md` o `html` y conviértelo en tu máquina física).

---

**NOMBRE**:

---

## Contexto del escenario

Has sido contratado por una fábrica inteligente que dispone de sensores de temperatura y vibración en sus máquinas críticas. La empresa necesita un sistema backend capaz de procesar los datos que llegan de los sensores en tiempo real.

El sistema debe cumplir dos objetivos simultáneos:

1.  **Monitorización en vivo (Dashboard):** los operarios necesitan saber el estado *actual* de cada máquina y si hay alguna alarma activa en este preciso instante. Para esto usarás **Redis**.
2.  **Histórico para mantenimiento predictivo:** el equipo de Data Science necesita almacenar todos los datos brutos a lo largo del tiempo para entrenar modelos de IA futuros. Para esto usarás **InfluxDB**.

## Los Datos de Entrada

Los datos con los que vas a trabajar los tienes en el *dataset* sintético adjunto llamado `sensores.csv`. Este *dataset* contiene lecturas simuladas con las siguientes columnas:

  - `timestamp`: fecha y hora del evento.
  - `machine_id`: identificador único de la máquina.
  - `zone`: zona de la fábrica.
  - `temperature`: temperatura en grados Celsius.
  - `vibration`: nivel de vibración (0-100).
  - `lat`, `lon`: coordenadas del robot.
  - `status`: estado reportado por la máquina ("OK", "WARNING", "ERROR").

**IMPORTANTE**

El desarrollo del examen debe de ser modular, con un programa principal que inicialice las conexiones a la base de datos y lea los datos del fichero y luego invocará **una función diferente para cargar cada tipo de dato** en la base de datos

Es decision tuya elegir los parámetros que recibirá cada función, aunque es altamente aconsejable **no utilizar variables globales**.

## Parte A: Persistencia histórica (InfluxDB)

`2 puntos`

En esta parte tienes que crear un script que lea el fichero CSV facilitado y almacene los datos en una base de datos InfluxDB.

Los aspectos que tienes que tener en cuenta son:

  - **Bucket:** `factory_logs`
  - **Measurement:** `maquinaria`
  - **Requisito clave:** debes modelar correctamente los datos usando adecuadamente *tags* o *fields* según el tipo de datos. Se debe respetar el `timestamp` del datos (no usar el tiempo de ingesta).

In [65]:
# Función que carga los datos en InfluxDB

import os
import csv
from datetime import datetime, timezone
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client import Point
from urllib3.exceptions import NewConnectionError

INFLUX_URL = "http://influxdb2:8086"
INFLUX_TOKEN = "MyInitialAdminToken0=payosno"

client = None
write_api = None

try:
    client = influxdb_client.InfluxDBClient(
        url=INFLUX_URL,
        token=INFLUX_TOKEN,
        org="docs"
    )

    write_api = client.write_api(write_api=SYNCHRONOUS)
    
except (InfluxDBError, NewConnectionError) as e:
    print("[ERROR] Error al conectar con InfluxDB:")
    print(f"   Detalle: {e}")


ficheros = os.listdir("./UT02/Ex1")

contadorPuntos = 0

for fic in ficheros:
    with open("./UT02/Ex1/"+fic, newline="", encoding="utf-8") as f:
        reader = csv.reader(f)
        
        next(reader, None)

        for row in reader:
            date_ = row[0]
            dt = datetime.strptime(date_,"%Y-%m-%d %H:%M:%S")
            dt_utc = dt.replace(tzinfo=timezone.utc)
            time_date = dt_utc.isoformat().replace('+00:00','Z')

            machine_id = row[1]
            zone = row[2]
            temperature = float(row[3])
            vibration = float(row[4])
            lat = float(row[5])
            lon = float(row[6])
            status = row[7]

            p = Point("maquinaria") \
                .tag("Machine_id", machine_id) \
                .tag("Zone", zone) \
                .time(time_date) \
                .field("Temperature", temperature) \
                .field("Vibration", vibration) \
                .field("Latitud", lat) \
                .field("Longitud", lon) \
                .field("status", status)
            
            contadorPuntos = contadorPuntos + 1
            write_api.write(bucket="factory_logs", org="docs", record=p)
            print(">> Escribiendo")


write_api.close()

print("------------------------------------")
print(f"Puntos insertados: {contadorPuntos}")





>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribiendo
>> Escribi

## Parte B - Analítica en tiempo real con Redis

Debes crear un script que alimente las siguientes estructuras en Redis por cada dato procesado:

### 1.- Estadísticas agregadas

`1 punto`

Al procesar masivamente datos de telemetría, es costoso consultar la base de datos histórica (InfluxDB) para preguntas simples como "¿Cuál ha sido la temperatura máxima hoy en el Almacén A?". Vamos a usar Redis Hashes para mantener un marcador actualizado de estadísticas por zona.

Para cada fila procesada del CSV, debes actualizar un Hash correspondiente a la Zona (zone) donde se encuentra el robot.

- **Clave:** `stats:zone:{nombre_zona}` (Ej: stats:zone:Almacen_A, stats:zone:Recepcion...).
- **Campos:**:
    - `total_lecturas`: contador total de datos recibidos de esa zona.
    - `total_errores`: contador de cuántas veces el status ha sido "ERROR".
    - `max_temp`: La temperatura más alta registrada hasta el momento en esa zona.

In [13]:
import redis

r = redis.Redis(
    host='redis',
    port=6379,
    db=0,
    decode_responses=True
)

In [30]:
# Función que genera las estadísticas agregadas
def temp_max():
    ficheros = os.listdir("./UT02/Ex1")
    for fic in ficheros:
        with open("./UT02/Ex1/"+fic, newline="", encoding="utf-8") as f:
            reader = csv.reader(f)
            
            next(reader, None)
            
            maxt = {}
            err = {}
            lec = {}
            for row in reader:
                zona = row[2]
                temp = float(row[3])
                status = row[7]

                if zona in maxt:
                    if temp > maxt[zona]:
                        maxt[zona] = temp
                else:
                        maxt[zona] = temp
                
                if zona in err:
                    if "ERROR" in status:
                        err[zona] = err[zona] + 1
                else:
                    if "ERROR" in status:
                        err[zona] = 1


                if zona in lec:
                    lec[zona] = lec[zona] + 1
                else:
                    lec[zona] = 1

                stats = {
                    "total_lecturas": 0,
                    "total_errores": 0,
                    "max_temp": 0
                }

                r.hset(f"stats:zone:{zona}", mapping=stats)  

        
            for zona, lec in lec.items():
                r.hset(f"stats:zone:{zona}", "total_lecturas", lec)
            for zona, er in err.items():
                r.hset(f"stats:zone:{zona}", "total_errores", er)   
            for zona, te in maxt.items():
                r.hset(f"stats:zone:{zona}", "max_temp", te)


### 2.- Ranking de "puntos calientes" (Sorted Set)

`1 punto`

El jefe de planta quiere ver en una pantalla un "Top de Máquinas con mayor temperatura" ordenado de mayor a menor en tiempo real.

- **Estructura:** `Sorted Set` (ZSET)
- **Clave:** `dashboard:hottest_machines`
- **Score:** La temperatura actual (`temperature`).
- **Member:** El ID de la máquina (`machine_id`).

In [40]:
# Función que carga el sorted set
def top_maquinas():
    ficheros = os.listdir("./UT02/Ex1")
    for fic in ficheros:
        with open("./UT02/Ex1/"+fic, newline="", encoding="utf-8") as f:
            reader = csv.reader(f)
            
            next(reader, None)

            for row in reader:

                id = row[1]
                temp = float(row[3])

                r.zadd("dasboard:hottest_machines", {id: temp})

            total = r.zrevrange("dasboard:hottest_machines", 0, -1, withscores=True)

            if not total:
                print("El ranking esta vacio")
                return
            
            for i, (idd, score) in enumerate(total):

                print(f" #{i+1} | Maquina:  {idd} ({score})")


### 3.- Seguimiento de flota (Geospatial)

`1 punto`

Las máquinas de este escenario son AGVs (robots móviles) que se mueven por la planta. Necesitamos saber su ubicación exacta.

- **Estructura:** `Geo`
- **Clave:** `factory:map`
- **Datos:** Usa la latitud y longitud que vienen en el CSV para posicionar el `machine_id`.

In [None]:
# Función que carga los datos geoespaciales
def geo_maquinas():
    ficheros = os.listdir("./UT02/Ex1")
    for fic in ficheros:
        with open("./UT02/Ex1/"+fic, newline="", encoding="utf-8") as f:
            reader = csv.reader(f)
            
            next(reader, None)

            for row in reader:
                id = row[1]
                lat = float(row[5])
                lon = float(row[6])

                r.geoadd("factory:map", (lon, lat, id))

### 4.- Contadores globales atómicos (String)

`1 punto`

Necesitamos estadísticas rápidas que no requieran contar filas en una base de datos histórica.

- **Estructura:** `String` (Contador)
- **Clave:** `stats:total_processed` -\> Incrementar en 1 por cada fila procesada.
- **Clave:** `stats:total_errors` -\> Incrementar en 1 solo si el `status` es "ERROR".
- **Clave:** `stats:total_warnings` -\> Incrementar en 1 solo si el `status` es "WARNING".



In [54]:
# Función que gestiona los contadores
def contador():
    ficheros = os.listdir("./UT02/Ex1")
    for fic in ficheros:
        with open("./UT02/Ex1/"+fic, newline="", encoding="utf-8") as f:
            reader = csv.reader(f)
            
            next(reader, None)
            r.set("stats:total_processed", 1)
            r.set("stats:total_errors", 1)
            r.set("stats:total_warnings", 1)

            for row in reader:

                status = row[7]
                r.incr("stats:total_processed")
                if "ERROR" in status:
                    r.incr("stats:total_errors")
                if "WARNING" in status:
                    r.incr("stats:total_warnings")

            
            tot = r.get("stats:total_processed")
            err = r.get("stats:total_errors")
            war = r.get ("stats:total_warnings")

            print(f"Total procesados: {tot}")
            print(f"Total errores: {err}")
            print(f"Total warnings: {war}")


### 5.- Cola de anomalías críticas (List)

`1 punto`

Queremos tener también una cola de anomalías críticas. Por cada registro cuyo `status` sea `ERROR` deberás crear un JSON y almacenarlo en una estructura tipo FIFO:

- **Estructura:** `List`
- **Clave:** `alerts:queue`
- **Datos:**: el JSON debe incluir: `machine_id`, `timestamp` y un mensaje: *"Critical failure at [Lat, Lon]"*.

In [53]:
# Función que carga los datos en la cola
import json

def lista_errores():
    ficheros = os.listdir("./UT02/Ex1")
    for fic in ficheros:
        with open("./UT02/Ex1/"+fic, newline="", encoding="utf-8") as f:
            reader = csv.reader(f)
            
            next(reader, None)

            for row in reader:
                date_ = row[0]
                dt = datetime.strptime(date_,"%Y-%m-%d %H:%M:%S")
                dt_utc = dt.replace(tzinfo=timezone.utc)
                time_date = dt_utc.isoformat().replace('+00:00','Z')

                machine_id = row[1]
                lat = float(row[5])
                lon = float(row[6])
                status = row[7]
                if "ERROR" in status:
                    js = {
                        "machine_id": machine_id,
                        "time": time_date,
                        "mensaje": f"Critical failure at [{lat}, {lon}]"
                    }

                    js = json.dumps(js)
                    r.lpush("alerts:queue", js)

## Programa principal

In [55]:
# Aquí debes insertar el programa principal que llama al resto de funciones
temp_max()
top_maquinas()
geo_maquinas()
contador()
lista_errores()


 #1 | Maquina:  AGV-06 (72.6)
 #2 | Maquina:  AGV-01 (68.11)
 #3 | Maquina:  AGV-03 (60.42)
 #4 | Maquina:  AGV-08 (56.58)
 #5 | Maquina:  AGV-04 (56.54)
 #6 | Maquina:  AGV-02 (50.49)
 #7 | Maquina:  AGV-05 (49.28)
 #8 | Maquina:  AGV-07 (48.26)
 #9 | Maquina:  AGV-09 (40.17)
 #10 | Maquina:  AGV-10 (38.21)
Total procesados: 10001
Total errores: 659


## Capturas de pantalla

A partir de aquí tienes que insertar las capturas de pantalla correspondientes a cada punto. Las capturas de pantalla corresponderán a la interfaz gráfica de la base de datos correspondiente y se debe mostrar que los datos se han cargado correctamente. Los apartados que no tengan la captura de pantalla correspondiente **se considerarán no realizados**.

### Captura de InfluxDB

![influx](./UT02/INFLUX.png)

### Captura de estadísticas agregadas

![Redis1](REDIS1.png)

### Captura de ranking de puntos calientes

![Redis2](REDIS2.png)

### Captura de seguimiento de flota

![Redis3](REDIS3.png)

### Captura de contadores globales atómicos

![Redis4](REDIS4.png)
![Redis5](REDIS5.png)
![Redis6](REDIS6.png)

### Captura de cola de anomalías críticas

![Redis7](REDIS7.png)
![Redis8](REDIS8.png)
