 # Taller ETL en MongoDB con Python

In [None]:
import os
import pandas as pd
from pymongo import MongoClient
from dotenv import load_dotenv


# Cargar variables desde .env

In [None]:
load_dotenv(dotenv_path="./.env")
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME")
COLLECTION = os.getenv("COLLECTION")

# --- EXTRACT ---

In [None]:
print("Extrayendo datos de MongoDB...")
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION]
print("Coecciones...",collection)
data = list(collection.find())
print(f"Registros obtenidos: {len(data)}")

In [None]:
# Convertir a DataFrame
df = pd.DataFrame(data)

In [None]:
df

# --- TRANSFORM ---

In [None]:
print(" Transformando datos...")

In [None]:
print(df.info())

#  Resumen por ciudad:

In [None]:
print(df.groupby("ciudad")["qty"].sum())


# Ventas promedio por unidad (precio * qty):

In [None]:
df["precio"]

En Mongo Decimal128 es un tipo específico que MongoDB usa para representar números decimales.

Esta línea convierte solo los valores que son instancias de Decimal a float. Si el valor no es un Decimal, se deja tal como está.

In [None]:
from bson.decimal128 import Decimal128

# Asegúrate de convertir Decimal128 
# Convierte Decimal128 a float
df["precio"] = df["precio"].apply(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x))



In [None]:
df["precio"]

In [None]:
# Asegurarnos de que 'precio' y 'qty' sean numéricos
# Calcular la venta total
df["venta_total"] = df["precio"] * df["qty"].astype(float)

# Mostrar solo las columnas relevantes
print(df[["item", "venta_total"]])


# Resumen por ciudad

In [None]:
print(df.groupby("ciudad")["venta_total"].sum())

# Filtrar productos con ventas internacionales:

In [None]:
df["ventas_internacional"] = df["ventas"].apply(lambda x: x.get("internacional") if isinstance(x, dict) else None)
print(df[["item", "ventas_internacional"]])

# Transformaciones útiles
# Expandir las bodegas (stock):

In [None]:
stock_df = df.explode("stock")
stock_df["bodega"] = stock_df["stock"].apply(lambda x: x["bodega"] if isinstance(x, dict) else None)
stock_df["cantidad"] = stock_df["stock"].apply(lambda x: x["cantidad"] if isinstance(x, dict) else None)
stock_df = stock_df.drop(columns=["stock"])
stock_df.head()

#Expandir calificaciones y sacar promedios:

In [None]:
print(df.columns)


In [None]:
venta_por_ciudad = df.groupby("ciudad")["venta_total"].sum()
# Agrupar por ciudad y calcular venta total promedio
print(df.groupby("ciudad")["venta_total"].mean())

# Agrupar por estado y sumar ventas internacionales
print(df.groupby("estado")["ventas_internacional"].sum())

# Top 3 productos por venta_total
print(df[["item", "venta_total"]].sort_values(by="venta_total", ascending=False).head(3))

In [None]:
import numpy as np

df["prom_calidad"] = df["evaCalidad"].apply(lambda x: np.mean(x) if isinstance(x, (list, np.ndarray)) else None)
df["prom_logistica"] = df["evaLogistica"].apply(lambda x: np.mean(x) if isinstance(x, (list, np.ndarray)) else None)


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Venta total por ciudad
plt.figure(figsize=(8, 5))
sns.barplot(x="ciudad", y="venta_total", data=df, estimator=sum)
plt.title("Venta Total por Ciudad")
plt.ylabel("Venta Total")
plt.xlabel("Ciudad")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Promedios de evaluación
plt.figure(figsize=(8, 5))
df[["prom_calidad", "prom_logistica"]].mean().plot(kind="bar", color=["skyblue", "lightgreen"])
plt.title("Promedio de Evaluaciones")
plt.ylabel("Puntaje Promedio")
plt.xticks(rotation=0)
plt.tight_layout()
plt.show()

#ETL a CSV

In [None]:
output_file = "output.csv"
df.to_csv(output_file, index=False)
print(f"Datos transformados y guardados en {output_file}")

# --- LOAD ---

In [None]:
print("Cargando datos a MongoDB...")

In [None]:
# Si es una Serie, conviértela a DataFrame y renombra columnas
venta_por_ciudad = venta_por_ciudad.reset_index()
venta_por_ciudad.columns = ["ciudad", "ventas"]

print(venta_por_ciudad)


In [None]:
records = venta_por_ciudad.to_dict("records")
print(records)

In [None]:
# Cargar datos transformados a MongoDB
output_collection = db["venta_por_ciudad"]
output_collection.insert_many(records)
print(f"Datos cargados en la colección {output_collection.name}")
print("Proceso ETL completo.")

In [None]:
# Cerrar conexión
client.close()