<a href="https://colab.research.google.com/github/DevOrtega/Pipeline-Kafka/blob/main/Mini_Pipeline_Big_Data_con_Kafka_%2B_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# UT3 ‚Äì Procesamiento de Datos Masivos y Gesti√≥n de la Informaci√≥n
## Actividad pr√°ctica: Mini-pipeline de Big Data con Kafka (RA3 ‚Äì Sistemas de Big Data)
En este notebook vamos a construir un **mini-pipeline de datos** que combine:
1. Ingesta batch (CSV/JSON ‚Üí DataFrame)
2. Ingesta en streaming usando Apache Kafka (productor + consumidor)
3. Limpieza y preprocesamiento
4. Modelado de datos (conceptual y l√≥gico)
5. Evaluaci√≥n de calidad e integridad
6. Consultas para extracci√≥n de valor
**Resultado de Aprendizaje trabajado (RA3 ‚Äì SPS):**
> Gestiona y almacena datos facilitando la b√∫squeda de respuestas en grandes conjuntos de datos.
Autor/a: Grupo 6
Fecha: **(dd/mm/aaaa)**

In [None]:
# @title Carga de librer√≠as principales
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import kagglehub
pd.set_option("display.max_columns", None)
sns.set(style="whitegrid")
print("Librer√≠as de an√°lisis de datos cargadas correctamente.")

Librer√≠as de an√°lisis de datos cargadas correctamente.


In [None]:
# @title Instalaci√≥n cliente Kafka y configuraci√≥n de conexi√≥n
!pip install kafka-python --quiet
from kafka import KafkaProducer, KafkaConsumer
import json
import time

# ==============================================
# CONFIGURACI√ìN DEL CLUSTER KAFKA (EXTERNO)
# ==============================================
# 1) Servidores bootstrap del cluster Kafka (proporcionados por el docente)
KAFKA_BOOTSTRAP_SERVERS = ["TU_BOOTSTRAP_SERVER:PORT"] # TODO: cambiar

# 2) Nombre del topic a utilizar para la pr√°ctica
KAFKA_TOPIC = "bigdata_ut3" # se puede personalizar por grupo, p.ej. "bigdata_ut3_grupo1"

# 3) ¬øEl cluster usa seguridad SASL_SSL?
USE_SASL_SSL = True # poner a False si el cluster NO usa SASL_SSL

# 4) Credenciales SASL/PLAIN (si aplica)
KAFKA_SECURITY_CONFIG = {
    "security_protocol": "SASL_SSL",
    "sasl_mechanism": "PLAIN",
    "sasl_plain_username": "TU_USERNAME_O_API_KEY", # TODO
    "sasl_plain_password": "TU_PASSWORD_O_API_SECRET" # TODO
}

print("Cliente Kafka preparado. Rellena los par√°metros de conexi√≥n antes de usarlo.")

## 1. Descripci√≥n del dataset y metadatos

**Origen del dataset**: Kaggle

**Tipo de dato**: Estructurado (datos organizados en filas y columnas con atributos definidos)

**Metadatos b√°sicos**:
- **Nombre del dataset**: Video Game Sales
- **Origen / URL**: https://www.kaggle.com/datasets/gregorut/videogamesales
- **N√∫mero aproximado de registros**: 16,598
- **N√∫mero de columnas**: 11
- **Tem√°tica**: Ventas de videojuegos (an√°lisis de ventas por plataforma, regi√≥n, g√©nero, etc.)
- **Formato**: CSV

**Descripci√≥n adicional**: Este dataset contiene informaci√≥n estructurada sobre las ventas de videojuegos en diferentes plataformas, incluyendo datos de ventas por regi√≥n (NA, EU, JP, Other) y variables como nombre del juego, plataforma, a√±o de lanzamiento, g√©nero, editor, entre otros.

In [None]:
# @title üßæ Metadatos del dataset (rellenar)
metadatos = {
    "nombre_dataset": "Video Game Sales",
    "origen": "Kaggle",
    "url_fuente": "https://www.kaggle.com/datasets/gregorut/videogamesales",
    "tipo_dato": "estructurado",
    "formato": "CSV",
    "descripcion": "Dataset con informaci√≥n sobre ventas de videojuegos en diferentes plataformas, incluyendo datos de ventas por regi√≥n (NA, EU, JP, Other) y variables como nombre del juego, plataforma, a√±o de lanzamiento, g√©nero, editor, entre otros. Contiene aproximadamente 16,598 registros y 11 columnas."
}
metadatos

In [None]:
# Descargar el dataset - kagglehub devuelve la ruta donde se descarg√≥
path = kagglehub.dataset_download("gregorut/videogamesales")

# Cargar el CSV (el archivo se llama vgsales.csv dentro del directorio descargado)
df = pd.read_csv(f"{path}/vgsales.csv")

# Opci√≥n B: Cargar desde archivo subido a Colab (descomentar si se usa)
# from google.colab import files
# uploaded = files.upload()
# df = pd.read_csv("vgsales.csv") # TODO

print("Dimensiones del dataset (filas, columnas):", df.shape)
df.head()

Using Colab cache for faster access to the 'videogamesales' dataset.
Dimensiones del dataset (filas, columnas): (16598, 11)


Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
0,1,Wii Sports,Wii,2006.0,Sports,Nintendo,41.49,29.02,3.77,8.46,82.74
1,2,Super Mario Bros.,NES,1985.0,Platform,Nintendo,29.08,3.58,6.81,0.77,40.24
2,3,Mario Kart Wii,Wii,2008.0,Racing,Nintendo,15.85,12.88,3.79,3.31,35.82
3,4,Wii Sports Resort,Wii,2009.0,Sports,Nintendo,15.75,11.01,3.28,2.96,33.0
4,5,Pokemon Red/Pokemon Blue,GB,1996.0,Role-Playing,Nintendo,11.27,8.89,10.22,1.0,31.37


In [None]:
# @title üîç Exploraci√≥n inicial del dataset
print("Informaci√≥n general del dataset:")
df.info()
print("\nDescripci√≥n estad√≠stica de variables num√©ricas:")
display(df.describe())
print("\nPrimeras filas:")
display(df.head())
print("\nN√∫mero de valores nulos por columna:")
display(df.isnull().sum())

Informaci√≥n general del dataset:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16598 entries, 0 to 16597
Data columns (total 11 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   Rank          16598 non-null  int64  
 1   Name          16598 non-null  object 
 2   Platform      16598 non-null  object 
 3   Year          16327 non-null  float64
 4   Genre         16598 non-null  object 
 5   Publisher     16540 non-null  object 
 6   NA_Sales      16598 non-null  float64
 7   EU_Sales      16598 non-null  float64
 8   JP_Sales      16598 non-null  float64
 9   Other_Sales   16598 non-null  float64
 10  Global_Sales  16598 non-null  float64
dtypes: float64(6), int64(1), object(4)
memory usage: 1.4+ MB

Descripci√≥n estad√≠stica de variables num√©ricas:


Unnamed: 0,Rank,Year,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
count,16598.0,16327.0,16598.0,16598.0,16598.0,16598.0,16598.0
mean,8300.605254,2006.406443,0.264667,0.146652,0.077782,0.048063,0.537441
std,4791.853933,5.828981,0.816683,0.505351,0.309291,0.188588,1.555028
min,1.0,1980.0,0.0,0.0,0.0,0.0,0.01
25%,4151.25,2003.0,0.0,0.0,0.0,0.0,0.06
50%,8300.5,2007.0,0.08,0.02,0.0,0.01,0.17
75%,12449.75,2010.0,0.24,0.11,0.04,0.04,0.47
max,16600.0,2020.0,41.49,29.02,10.22,10.57,82.74



Primeras filas:


Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
0,1,Wii Sports,Wii,2006.0,Sports,Nintendo,41.49,29.02,3.77,8.46,82.74
1,2,Super Mario Bros.,NES,1985.0,Platform,Nintendo,29.08,3.58,6.81,0.77,40.24
2,3,Mario Kart Wii,Wii,2008.0,Racing,Nintendo,15.85,12.88,3.79,3.31,35.82
3,4,Wii Sports Resort,Wii,2009.0,Sports,Nintendo,15.75,11.01,3.28,2.96,33.0
4,5,Pokemon Red/Pokemon Blue,GB,1996.0,Role-Playing,Nintendo,11.27,8.89,10.22,1.0,31.37



N√∫mero de valores nulos por columna:


Unnamed: 0,0
Rank,0
Name,0
Platform,0
Year,271
Genre,0
Publisher,58
NA_Sales,0
EU_Sales,0
JP_Sales,0
Other_Sales,0


## 2. Objetivos de negocio y preguntas clave
Define **al menos 3 preguntas de negocio** que quieras responder con este dataset.
Ejemplos:
- ¬øQu√© factores influyen m√°s en X?
- ¬øQu√© tipo de clientes generan m√°s ingresos?
- ¬øQu√© d√≠as/horas se concentra m√°s actividad?
> Escribe aqu√≠ tus preguntas:
1. ‚Ä¶
2. ‚Ä¶
3. ‚Ä¶

In [None]:
# @title Productor Kafka: env√≠o del dataset fila a fila
# Configuraci√≥n del productor
producer_config = {
    "bootstrap_servers": KAFKA_BOOTSTRAP_SERVERS,
    "value_serializer": lambda v: json.dumps(v).encode("utf-8")
}

if USE_SASL_SSL:
    producer_config.update(KAFKA_SECURITY_CONFIG)

producer = KafkaProducer(**producer_config)

# Convertir DataFrame a lista de diccionarios (cada fila = mensaje)
registros = df.to_dict(orient="records")
print(f"Enviando {len(registros)} mensajes al topic '{KAFKA_TOPIC}'...")

for i, fila in enumerate(registros):
    producer.send(KAFKA_TOPIC, value=fila)
    # Retardo opcional para simular streaming
    # time.sleep(0.01)

producer.flush()
print("‚úÖ Env√≠o completado.")

In [None]:
# @title Consumidor Kafka: lectura y reconstrucci√≥n del DataFrame

# Configuraci√≥n b√°sica
consumer_config = {
    "bootstrap_servers": KAFKA_BOOTSTRAP_SERVERS,
    "auto_offset_reset": "earliest",  # empezamos desde el inicio del topic
    "enable_auto_commit": True,
    "group_id": "grupo_ut3",          # TODO: se puede personalizar por grupo
    "value_deserializer": lambda m: json.loads(m.decode("utf-8"))
}

# Aplicar seguridad si est√° activada
if USE_SASL_SSL:
    consumer_config.update(KAFKA_SECURITY_CONFIG)

# Instancia del consumidor
consumer = KafkaConsumer(KAFKA_TOPIC, **consumer_config)

registros_consumidos = []

# IMPORTANTE: 'df' debe existir en memoria (de una celda anterior de carga de datos)
MAX_MENSAJES = df.shape[0]

print("Leyendo mensajes del topic...")

# Bucle de consumo
for msg in consumer:
    registros_consumidos.append(msg.value)

    # Condici√≥n de parada
    if len(registros_consumidos) >= MAX_MENSAJES:
        break

print(f" Mensajes le√≠dos desde Kafka: {len(registros_consumidos)}")

# Crear DataFrame y mostrar resultados
df_stream = pd.DataFrame(registros_consumidos)
display(df_stream.head())
print("Dimensiones de df_stream:", df_stream.shape)

In [None]:
# @title Limpieza b√°sica sobre df_stream
df_clean = df_stream.copy()
print("Registros duplicados antes de limpiar:", df_clean.duplicated().sum())
df_clean = df_clean.drop_duplicates()
print("Registros duplicados despu√©s de limpiar:", df_clean.duplicated().sum())
print("\nValores nulos por columna (antes de imputar):")
display(df_clean.isnull().sum())
# TODO: definir estrategia de imputaci√≥n seg√∫n el dataset
# Ejemplo para columnas num√©ricas: imputar con la mediana
numeric_cols_tmp = df_clean.select_dtypes(include=["int64", "float64"]).columns
df_clean[numeric_cols_tmp] = df_clean[numeric_cols_tmp].fillna(df_clean[numeric_cols_tmp].median())
print("\nValores nulos por columna (despu√©s de imputar num√©ricas):")
display(df_clean.isnull().sum())
df_clean.head()

In [None]:
# @title Boxplots para detecci√≥n de outliers (opcional)
numeric_cols = df_clean.select_dtypes(include=["int64", "float64"]).columns
print("Columnas num√©ricas:", list(numeric_cols))

for col in numeric_cols:
    plt.figure(figsize=(6, 3))
    sns.boxplot(x=df_clean[col])
    plt.title(f"Boxplot de {col}")
    plt.show()

# TODO: decidir si se filtran o transforman outliers en alguna columna

## 3. Modelado de datos (conceptual y l√≥gico)
En esta secci√≥n dise√±a:
1. **Modelo conceptual**
- Entidades principales (ej.: Cliente, Pedido, Producto, etc.)
- Relaciones entre entidades (ej.: Cliente ‚Äî realiza ‚Äî Pedido)
2. **Modelo l√≥gico**
- Elige el tipo de modelo de almacenamiento:
- Relacional (tablas SQL)
- NoSQL documental (MongoDB)
- Columnar (Cassandra)
- Clave-valor (Redis)
- Grafos (Neo4j)
- Justifica la elecci√≥n:
- Tipo de consultas
- Volumen y variedad de los datos
- Necesidades de escalabilidad / rendimiento
> Adjunta o describe aqu√≠ tu modelo (texto, tabla resumen o enlace a un diagrama).

In [None]:
# @title Selecci√≥n de variables y pipeline de preprocesamiento
df_model = df_clean.copy()

# TODO: definir variable objetivo (si procede). Si no hay objetivo, dejar a None.
target = None # p.ej.: "tip" en el dataset 'tips'

if target is not None and target in df_model.columns:
    features = [col for col in df_model.columns if col != target]
    X = df_model[features].copy()
    y = df_model[target].copy()
else:
    features = list(df_model.columns)
    X = df_model.copy()
    y = None

print("Features:", features)
print("Target:", target if target is not None and target in df_model.columns else "No se ha definido variable objetivo")

numeric_features = X.select_dtypes(include=["int64", "float64"]).columns
categorical_features = X.select_dtypes(include=["object", "category"]).columns

print("\nColumnas num√©ricas:", list(numeric_features))
print("Columnas categ√≥ricas:", list(categorical_features))

# Transformador num√©rico: escalado MinMax
numeric_transformer = Pipeline(steps=[
    ("scaler", MinMaxScaler())
])

# Transformador categ√≥rico: One-Hot Encoding
categorical_transformer = Pipeline(steps=[
    ("encoder", OneHotEncoder(handle_unknown="ignore"))
])

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, numeric

In [None]:
# @title Transformaci√≥n y guardado de datos procesados
X_train_transformed = preprocessor.transform(X_train)
print("Shape de X_train transformado:", X_train_transformed.shape)

# Intento convertir a DataFrame (puede ser grande)
try:
    X_train_dense = pd.DataFrame(
        X_train_transformed.toarray() if hasattr(X_train_transformed, "toarray") else X_train_transformed
    )
    display(X_train_dense.head())
except Exception as e:
    print("No se ha podido convertir a DataFrame por tama√±o/formato:", e)

# Guardar conjunto limpio (sin codificaci√≥n) como ejemplo
df_clean.to_csv("datos_procesados.csv", index=False)
df_clean.to_parquet("datos_procesados.parquet", index=False)
print("Archivos 'datos_procesados.csv' y 'datos_procesados.parquet' guardados.")

## 4. Calidad del dato e integridad
Eval√∫a la calidad de tu dataset **despu√©s del preprocesamiento**:
- **Exactitud**: ¬øLos datos parecen razonables / realistas?
- **Completitud**: ¬øQu√© porcentaje de nulos queda? ¬øEn qu√© columnas?
- **Consistencia**: ¬øHay valores contradictorios o incoherentes?
- **Unicidad**: ¬øPersisten duplicados?
- **Oportunidad**: ¬øLos datos siguen siendo v√°lidos / actuales para el problema?
> Escribe aqu√≠ un breve an√°lisis de calidad del dato y las decisiones tomadas en tu pipeline.

In [None]:
# @title M√©tricas simples de calidad del dato
n_filas, n_cols = df_clean.shape
porcentaje_nulos = (df_clean.isnull().sum() / n_filas) * 100
duplicados = df_clean.duplicated().sum()
print("N√∫mero de filas:", n_filas)
print("N√∫mero de columnas:", n_cols)
print("\nPorcentaje de nulos por columna:")
display(porcentaje_nulos)
print("\nN√∫mero de registros duplicados:", duplicados)

In [None]:
# @title Consultas de negocio y agregaciones
# TODO: adaptar estas consultas a tu dataset concreto
print("Columnas disponibles en df_clean:")
print(df_clean.columns)

# Ejemplo gen√©rico 1: agrupar por una columna categ√≥rica
col_cat_ejemplo = None # p.ej. "sex", "day", "species", etc.
col_num_ejemplo = None # p.ej. "total_bill", "tip", "sepal_length", etc.

if col_cat_ejemplo in df_clean.columns and col_num_ejemplo in df_clean.columns:
    print(f"\nMedia de {col_num_ejemplo} por {col_cat_ejemplo}:")
    display(df_clean.groupby(col_cat_ejemplo)[col_num_ejemplo].mean())
else:
    print("\nConfigura 'col_cat_ejemplo' y 'col_num_ejemplo' seg√∫n tu dataset para ver un ejemplo.")

# A√±ade aqu√≠ tus propias consultas que respondan a las preguntas de negocio
# Ejemplo:
# resultado_1 = df_clean.groupby("ALGUNA_COLUMNA")["OTRA_COLUMNA"].agg(["mean", "sum"])
# display(resultado_1)

## 5. Conclusiones del mini-pipeline
1. **Resumen del proceso**
- Describe brevemente las etapas: ingesta batch, env√≠o a Kafka, consumo desde Kafka,
limpieza, modelado, calidad del dato y consultas.
2. **Respuestas a las preguntas de negocio**
- ¬øQu√© has podido responder?
- ¬øQu√© patrones o insights relevantes has obtenido?
3. **Limitaciones y mejoras**
- ¬øQu√© har√≠as mejor si tuvieras m√°s tiempo o m√°s herramientas (Spark, MongoDB, etc.)?
- ¬øQu√© otras fuentes de datos podr√≠an enriquecer el an√°lisis?
4. **Relaci√≥n con el RA3 de Sistemas de Big Data**
- Explica en 4‚Äì5 l√≠neas c√≥mo este mini-pipeline te ha permitido:
- gestionar datos,
- almacenarlos de forma apropiada,
- y facilitar la b√∫squeda de respuestas en grandes conjuntos de datos.