# Aniversario Power BI !

## Integración completa End-to-End (ETL JSON Spotify Extended Streaming History - Power BI) 

Flujo del proyecto:

1. Integración, etiquetado y limpieza de datos de `Spotify Extended Streaming History` de 3 usuarios.
2. Integración de JSON a una lista completa.
3. Carga de ficheros JSON combinado en MongoDB: </br>
    3. 1. Nombre de la base de datos: [Spotify]</br>
    3. 2. Nombre de la colección: [Stream] </br>
    3. 3. Realizamos una validación de datos cargados completamente.</br>
4. Conexión a MongoDB mediante Power BI.
5. Elaboración de Dashaboard en entorno Power BI.

# 📒 Notebook ETL Spotify → MongoDB

---

## 1) Tarea  
**“Integrar todos los JSON de ‘Spotify Extended Streaming History’ y cargarlos en MongoDB.”**

---

## 2) Por qué lo haces  
- Centralizar tu historial de reproducción en una base de datos document-oriented.  
- Facilitar consultas, agregaciones y refrescos automáticos desde Power BI.  
- Practicar un pipeline **E**xtract-**L**oad sencillo y reproducible.

---

## 3) Lógica  
1. **Cargar variables** desde `.env` (credenciales y paths).  
2. **Conectar** al servidor MongoDB con `pymongo`.  
3. **Recorrer** recursivamente todos los JSON bajo `data/`.  
4. **Leer** cada archivo y acumular registros en una lista.  
5. **Insertar** en bloque (`insert_many`) en la colección destino.  
6. **Validar** con conteo de documentos y mostrar muestras.

---

---

## 💻 4) Código paso a paso


### 1. Instalación de librerías (solo la primera vez)
```python
# !pip install pymongo python-dotenv tqdm


### 2. Imports y carga de variables
Objetivo: leer tu .env y obtener toda la configuración desde ahí.

In [2]:
from pathlib import Path
import os, json
from pymongo import MongoClient
from tqdm import tqdm
from dotenv import load_dotenv

# Carga las variables del .env (sin hardcodear localhost aquí)
load_dotenv()

# Ahora todas las rutas y credenciales vienen de tu .env
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME   = os.getenv("DB_NAME")
COLL_NAME = os.getenv("COLL_NAME")
DATA_ROOT = Path(os.getenv("DATA_ROOT", "data"))

assert MONGO_URI, "🛑 Define MONGO_URI en tu .env antes de seguir."
assert DB_NAME,   "🛑 Define DB_NAME en tu .env antes de seguir."
assert COLL_NAME, "🛑 Define COLL_NAME en tu .env antes de seguir."


### 3. Conexión y “creación” de DB/colección
Objetivo: conectar a Mongo y preparar el objeto coll. Mongo creará la base y colección al primer insert_many.

In [3]:
# 3) Conectar a MongoDB y preparar colección
client = MongoClient(MONGO_URI)

# Refiere la DB; no hace falta crearla manualmente
db   = client[DB_NAME]

# Refiere la colección; tampoco es necesario crearla antes
coll = db[COLL_NAME]

# Verificación rápida
print("✅ Conectado a MongoDB:", client.address)
print("▶️ Base de datos apuntada:", db.name)
print("▶️ Colección apuntada:", coll.name)


✅ Conectado a MongoDB: ('localhost', 27017)
▶️ Base de datos apuntada: spotify
▶️ Colección apuntada: extended_streams


### 4. Leer todos los JSON (preparación)
Objetivo: listar rutas de JSON y contar cuántos archivos hay antes de leerlos.

In [None]:
# Buscamos todos los JSON bajo data/user*/Spotify Extended Streaming History
json_paths = list(DATA_ROOT.glob("user*/Spotify Extended Streaming History/*.json"))

# Mensaje informativo antes de cargar
if not json_paths:
    raise FileNotFoundError(f"🛑 No encontré ningún JSON en {DATA_ROOT}")
else:
    print(f"🔍 Encontrados {len(json_paths)} archivos JSON en total:")
    for p in json_paths:
        print("   •", p.name)


🔍 Encontrados 52 archivos JSON en total:
   • Streaming_History_Audio_2017-2018_0.json
   • Streaming_History_Audio_2018-2019_3.json
   • Streaming_History_Audio_2018_1.json
   • Streaming_History_Audio_2018_2.json
   • Streaming_History_Audio_2019-2020_7.json
   • Streaming_History_Audio_2019_4.json
   • Streaming_History_Audio_2019_5.json
   • Streaming_History_Audio_2019_6.json
   • Streaming_History_Audio_2020-2021_11.json
   • Streaming_History_Audio_2020_10.json
   • Streaming_History_Audio_2020_8.json
   • Streaming_History_Audio_2020_9.json
   • Streaming_History_Audio_2021-2022_14.json
   • Streaming_History_Audio_2021_12.json
   • Streaming_History_Audio_2021_13.json
   • Streaming_History_Audio_2022-2023_17.json
   • Streaming_History_Audio_2022_15.json
   • Streaming_History_Audio_2022_16.json
   • Streaming_History_Audio_2023-2024_21.json
   • Streaming_History_Audio_2023_18.json
   • Streaming_History_Audio_2023_19.json
   • Streaming_History_Audio_2023_20.json
   • Strea

### 5. Carga de datos e inserción en MongoDB

In [None]:
from dateutil import parser

records = []

for path in tqdm(json_paths, desc="📥 Cargando registros"):
    user = path.parent.parent.name
    with open(path, encoding="utf-8") as f:
        data = json.load(f)

        def normalize_and_append(rec):
            # 1) Convertir ts de string a datetime
            rec["ts"] = parser.isoparse(rec["ts"])
            # 2) Añadir etiqueta de usuario
            rec["user"] = user
            records.append(rec)

        if isinstance(data, list):
            for rec in data:
                normalize_and_append(rec)
        else:
            normalize_and_append(data)

print(f"🔢 Total de registros en memoria: {len(records):,}")

# Limpia colección si necesitas rehacer pruebas
# coll.drop()

📥 Cargando registros: 100%|██████████| 52/52 [00:05<00:00,  9.72it/s]

🔢 Total de registros en memoria: 756,564





In [6]:
# Insertar en bloque
if records:
    res = coll.insert_many(records)
    print(f"✅ Insertados {len(res.inserted_ids):,} documentos en '{COLL_NAME}'")
else:
    print("⚠️ No había registros para insertar.")

✅ Insertados 756,564 documentos en 'extended_streams'


### 6. Validación rápida y recomendaciones

In [7]:
# Validación
total = coll.count_documents({})
print(f"📊 Documentos totales en la colección: {total:,}")

# Mostrar un sample de 5 documentos
print("\n🔍 Ejemplo de documentos cargados:")
for doc in coll.find().limit(5):
    print(doc)

📊 Documentos totales en la colección: 756,564

🔍 Ejemplo de documentos cargados:
{'_id': ObjectId('6882871c274fc8efb3025f68'), 'ts': '2017-10-06T03:44:10Z', 'platform': 'Windows 10 (10.0.15063; x64)', 'ms_played': 285040, 'conn_country': 'EC', 'ip_addr': '190.152.176.16', 'master_metadata_track_name': 'Antes de que cuente diez', 'master_metadata_album_artist_name': 'Fito y Fitipaldis', 'master_metadata_album_album_name': 'Antes de que cuente diez', 'spotify_track_uri': 'spotify:track:3xiNRrrVROKlHrflHGNTfG', 'episode_name': None, 'episode_show_name': None, 'spotify_episode_uri': None, 'audiobook_title': None, 'audiobook_uri': None, 'audiobook_chapter_uri': None, 'audiobook_chapter_title': None, 'reason_start': 'trackdone', 'reason_end': 'trackdone', 'shuffle': False, 'skipped': False, 'offline': False, 'offline_timestamp': None, 'incognito_mode': False, 'user': 'user1'}
{'_id': ObjectId('6882871c274fc8efb3025f69'), 'ts': '2017-10-06T03:49:02Z', 'platform': 'Windows 10 (10.0.15063; x64)

### 7. Optimización para consultas desde Power BI.

Para la optimización de consultas desde Power BI creamos un índice compuesto por user y timestamp (ts)

In [8]:
# Crear índice compuesto para búsquedas por usuario y orden cronológico
coll.create_index([("user", 1), ("ts", 1)], name="idx_user_ts")


'idx_user_ts'

In [None]:
# Convertir ts de string a Date en toda la colección 
from pymongo import MongoClient

update_result = coll.update_many(
    {},  # todos los documentos
    [
        { "$set": { "ts": { "$toDate": "$ts" } } }
    ]
)

print(f"✅ Documentos afectados: {update_result.modified_count:,}")
print(f"   (Total chequeados: {update_result.matched_count:,})")



✅ Documentos afectados: 0
   (Total chequeados: 756,564)


In [None]:
# Validación del tipo de campo ts en un documento de ejemplo
sample = coll.find_one({})
print("Tipo de ts:", type(sample["ts"]))
print("Valor de ts:", sample["ts"])


Tipo de ts: <class 'datetime.datetime'>
Valor de ts: 2017-10-06 03:44:10


In [None]:
# Validar que no queden documentos con ts como string

# Cuenta cuántos documentos tienen ts de tipo string
string_count = coll.count_documents({ "ts": { "$type": "string" } })
print(f"Documentos con ts en string: {string_count}")

# Opcional: lanzar error si hay alguno
assert string_count == 0, f"⚠️ Aún hay {string_count} documentos con ts como string"


Documentos con ts en string: 0
