# üöÄ ETL Pipeline - CoinMarketCap API

**Autor:** Mario Soriano Ba√±uls | **Proyecto:** Portfolio Profesional

---

## üìã Resumen Ejecutivo

Pipeline ETL automatizado que extrae datos del mercado de criptomonedas desde CoinMarketCap API, los transforma con pandas y los almacena en SQLite.

**Stack T√©cnico:** Python | pandas | SQLite | REST API | schedule

**Caracter√≠sticas principales:**
- ‚úÖ Extracci√≥n robusta con reintentos autom√°ticos y rate limiting
- ‚úÖ Transformaci√≥n de JSON anidado a tablas relacionales normalizadas
- ‚úÖ Validaci√≥n de datos y manejo de errores
- ‚úÖ Persistencia en SQLite con prevenci√≥n de duplicados
- ‚úÖ Automatizaci√≥n configurable (cada N horas)

**Resultado:** Base de datos `coinmarketcap_etl.db` con 3 tablas listas para an√°lisis

---

## 1Ô∏è‚É£ Configuraci√≥n e Imports

Cargamos librer√≠as y configuramos credenciales API desde archivo `.env`

In [1]:

import requests
import pandas as pd
import sqlite3
import time
import random
import os
from dotenv import load_dotenv

# Configuraci√≥n API
load_dotenv()
API_KEY = os.getenv("CMC_API_KEY")
BASE_URL = "https://pro-api.coinmarketcap.com/v1"
HEADERS = {'Accepts': 'application/json', 'X-CMC_PRO_API_KEY': API_KEY}
DB_PATH = 'coinmarketcap_etl.db'

print("‚úÖ Configuraci√≥n cargada correctamente")

‚úÖ Configuraci√≥n cargada correctamente


## 2Ô∏è‚É£ Funciones de Extracci√≥n (Extract)

**¬øQu√© datos extraemos?**

- **Listings**: Top 100 criptomonedas (precio, volumen, market cap, cambios %)Funci√≥n robusta para llamadas API con reintentos autom√°ticos y exponential backoff

- **Global Metrics**: M√©tricas agregadas del mercado total
- **Crypto Info**: Metadata (descripciones, URLs, logos, categor√≠as)

In [2]:
def probar_endpoint(endpoint_path, parameters={}, max_retries=3, initial_wait=1):
    """
    Realiza llamada GET a CoinMarketCap API con manejo de errores.
    
    Args:
        endpoint_path: Ruta del endpoint (ej: '/cryptocurrency/listings/latest')
        parameters: Par√°metros de la query
        max_retries: Reintentos en caso de error
        initial_wait: Tiempo de espera inicial (segundos)
    
    Returns:
        dict: Datos de la API o None si falla
    """
    url = f"{BASE_URL}{endpoint_path}"
    
    for intento in range(max_retries):
        try:
            response = requests.get(url, headers=HEADERS, params=parameters, timeout=10)
            
            # Manejar rate limiting (429)
            if response.status_code == 429:
                wait_time = initial_wait * (2 ** intento) + random.uniform(0, 1)
                print(f"‚è≥ Rate limit. Esperando {wait_time:.1f}s...")
                time.sleep(wait_time)
                continue
            
            response.raise_for_status()
            datos = response.json()
            
            # Validar respuesta
            if datos.get('status', {}).get('error_code') not in [None, 0]:
                print(f"‚ùå API Error: {datos['status'].get('error_message')}")
                return None
            
            print(f"‚úÖ {endpoint_path} | Cr√©ditos: {datos['status'].get('credit_count', 'N/A')}")
            return datos.get('data')
            
        except Exception as e:
            print(f"‚ùå Error (intento {intento + 1}/{max_retries}): {e}")
            if intento < max_retries - 1:
                time.sleep(initial_wait * (2 ** intento))
    
    return None


def obtener_info_clave():
    """Consulta cr√©ditos API disponibles"""
    return probar_endpoint('/key/info')


print("‚úÖ Funciones de extracci√≥n definidas")

‚úÖ Funciones de extracci√≥n definidas


## 3Ô∏è‚É£ Funciones de Transformaci√≥n (Transform)

**¬øC√≥mo transformamos?**

- **JSON ‚Üí DataFrame**: Convertimos respuestas JSON anidadas a tablas planasNormalizan estructuras JSON anidadas y crean DataFrames limpios

- **Normalizaci√≥n**: Extraemos datos USD de estructuras `quote` anidadas

- **Validaci√≥n**: Verificamos integridad de datos (no vac√≠os, tipos correctos)- **Enriquecimiento**: Agregamos categor√≠a de market cap (Mega/Large/Mid/Small-cap)

In [3]:
def normalizar_quote(quote_dict):
    """Extrae datos USD de estructura 'quote' anidada"""
    if not isinstance(quote_dict, dict):
        return {}
    usd = quote_dict.get('USD', {})
    return {
        'price': usd.get('price'),
        'volume_24h': usd.get('volume_24h'),
        'volume_change_24h': usd.get('volume_change_24h'),
        'percent_change_1h': usd.get('percent_change_1h'),
        'percent_change_24h': usd.get('percent_change_24h'),
        'percent_change_7d': usd.get('percent_change_7d'),
        'market_cap': usd.get('market_cap'),
        'market_cap_dominance': usd.get('market_cap_dominance')
    }


def transformar_listings(data_raw):
    """Transforma listado de criptomonedas a DataFrame"""
    if not isinstance(data_raw, list):
        print("‚ö†Ô∏è Data no es lista")
        return pd.DataFrame()
    
    registros = []
    for item in data_raw:
        base = {
            'id': item.get('id'),
            'name': item.get('name'),
            'symbol': item.get('symbol'),
            'slug': item.get('slug'),
            'cmc_rank': item.get('cmc_rank'),
            'num_market_pairs': item.get('num_market_pairs'),
            'circulating_supply': item.get('circulating_supply'),
            'total_supply': item.get('total_supply'),
            'max_supply': item.get('max_supply')
        }
        base.update(normalizar_quote(item.get('quote')))
        registros.append(base)
    
    return pd.DataFrame(registros)


def transformar_global_metrics(data_raw):
    """Transforma m√©tricas globales del mercado"""
    if not isinstance(data_raw, dict):
        print("‚ö†Ô∏è Data no es diccionario")
        return pd.DataFrame()
    
    quote_usd = data_raw.get('quote', {}).get('USD', {})
    
    df = pd.DataFrame([{
        'total_cryptocurrencies': data_raw.get('total_cryptocurrencies'),
        'active_cryptocurrencies': data_raw.get('active_cryptocurrencies'),
        'total_exchanges': data_raw.get('total_exchanges'),
        'active_exchanges': data_raw.get('active_exchanges'),
        'active_market_pairs': data_raw.get('active_market_pairs'),
        'total_market_cap': quote_usd.get('total_market_cap'),
        'total_volume_24h': quote_usd.get('total_volume_24h'),
        'btc_dominance': data_raw.get('btc_dominance'),
        'eth_dominance': data_raw.get('eth_dominance'),
        'defi_volume_24h': data_raw.get('defi_volume_24h'),
        'defi_market_cap': data_raw.get('defi_market_cap')
    }])
    
    return df


def transformar_crypto_info(data_raw):
    """Transforma metadata de criptomonedas"""
    if not isinstance(data_raw, dict):
        print("‚ö†Ô∏è Data no es diccionario")
        return pd.DataFrame()
    
    registros = []
    for crypto_id, info in data_raw.items():
        # Extraer URLs de forma segura
        urls = info.get('urls', {})
        website_list = urls.get('website', [])
        twitter_list = urls.get('twitter', [])
        
        registros.append({
            'id': info.get('id'),
            'name': info.get('name'),
            'symbol': info.get('symbol'),
            'slug': info.get('slug'),
            'description': info.get('description', '')[:500],  # Limitar texto
            'logo': info.get('logo'),
            'website': website_list[0] if website_list else None,
            'twitter': twitter_list[0] if twitter_list else None,
            'category': info.get('category'),
            'platform': info.get('platform', {}).get('name') if info.get('platform') else None
        })
    
    return pd.DataFrame(registros)


def validar_dataframe(df, nombre_tabla):
    """Valida que DataFrame no est√© vac√≠o"""
    if df is None or df.empty:
        print(f"‚ùå {nombre_tabla}: DataFrame vac√≠o")
        return False
    print(f"‚úÖ {nombre_tabla}: {len(df)} registros")
    return True


def enriquecer_listings(df):
    """Agrega categorizaci√≥n por market cap"""
    if 'market_cap' not in df.columns:
        return df
    
    def categorizar_mcap(mcap):
        if pd.isna(mcap) or mcap == 0:
            return None
        if mcap >= 10_000_000_000:
            return 'Mega-cap'
        elif mcap >= 1_000_000_000:
            return 'Large-cap'
        elif mcap >= 100_000_000:
            return 'Mid-cap'
        else:
            return 'Small-cap'
    
    df['market_cap_category'] = df['market_cap'].apply(categorizar_mcap)
    return df


print("‚úÖ Funciones de transformaci√≥n definidas")

‚úÖ Funciones de transformaci√≥n definidas


## 4Ô∏è‚É£ Funciones de Carga (Load)

**¬øD√≥nde se guardan los datos?**

- **Base de datos**: `coinmarketcap_etl.db` (SQLite, mismo directorio del proyecto)Persisten datos en SQLite con prevenci√≥n de duplicados

- **Modo**: `if_exists='replace'` - Reemplaza datos anteriores (evita duplicados)

- **Tablas creadas**:   - `crypto_metadata` - 3 registros (BTC, ETH, BNB)

  - `crypto_listings` - 100 registros  - `global_metrics` - 1 registro

In [4]:
def guardar_en_sqlite(df, nombre_tabla, db_path=DB_PATH):
    """
    Guarda DataFrame en SQLite (reemplaza datos anteriores).
    
    Args:
        df: DataFrame a guardar
        nombre_tabla: Nombre de la tabla
        db_path: Ruta de la base de datos
    """
    if df is None or df.empty:
        print(f"‚ö†Ô∏è {nombre_tabla}: Sin datos para guardar")
        return
    
    try:
        conn = sqlite3.connect(db_path)
        df.to_sql(nombre_tabla, conn, if_exists='replace', index=False)
        conn.close()
        print(f"‚úÖ {nombre_tabla}: {len(df)} registros guardados")
    except Exception as e:
        print(f"‚ùå Error guardando {nombre_tabla}: {e}")


def leer_desde_sqlite(nombre_tabla, db_path=DB_PATH, limit=5):
    """Lee datos desde SQLite"""
    try:
        conn = sqlite3.connect(db_path)
        query = f"SELECT * FROM {nombre_tabla} LIMIT {limit}"
        df = pd.read_sql_query(query, conn)
        conn.close()
        return df
    except Exception as e:
        print(f"‚ùå Error leyendo {nombre_tabla}: {e}")
        return None


def consulta_sqlite(query, db_path=DB_PATH):
    """Ejecuta consulta SQL personalizada"""
    try:
        conn = sqlite3.connect(db_path)
        df = pd.read_sql_query(query, conn)
        conn.close()
        return df
    except Exception as e:
        print(f"‚ùå Error en consulta: {e}")
        return None


print("‚úÖ Funciones de carga definidas")

‚úÖ Funciones de carga definidas


## 5Ô∏è‚É£ Pipeline ETL Completo

Ejecuta el flujo completo: Extract ‚Üí Transform ‚Üí Load

In [5]:
from datetime import datetime

def ejecutar_etl_completo():
    """
    Ejecuta el pipeline ETL completo.
    
    Returns:
        bool: True si √©xito, False si falla
    """
    print(f"\n{'='*70}")
    print(f"üöÄ INICIO ETL - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"{'='*70}\n")
    
    try:
        # 1. EXTRACT
        print("üì• 1/3 Extrayendo datos de la API...")
        
        listings_params = {'start': '1', 'limit': '100', 'sort': 'market_cap', 'convert': 'USD'}
        listings_data = probar_endpoint('/cryptocurrency/listings/latest', listings_params)
        
        global_params = {'convert': 'USD'}
        global_data = probar_endpoint('/global-metrics/quotes/latest', global_params)
        
        info_params = {'id': '1,1027,825'}  # BTC, ETH, BNB
        info_data = probar_endpoint('/cryptocurrency/info', info_params)
        
        if not all([listings_data, global_data, info_data]):
            print("‚ùå Error en extracci√≥n. Abortando.")
            return False
        
        # 2. TRANSFORM
        print("\n‚öôÔ∏è 2/3 Transformando datos...")
        
        df_listings = transformar_listings(listings_data)
        df_global = transformar_global_metrics(global_data)
        df_info = transformar_crypto_info(info_data)
        
        if validar_dataframe(df_listings, 'listings'):
            df_listings = enriquecer_listings(df_listings)
        
        validar_dataframe(df_global, 'global_metrics')
        validar_dataframe(df_info, 'metadata')
        
        # 3. LOAD
        print("\nüíæ 3/3 Cargando a SQLite...")
        
        guardar_en_sqlite(df_listings, 'crypto_listings')
        guardar_en_sqlite(df_global, 'global_metrics')
        guardar_en_sqlite(df_info, 'crypto_metadata')
        
        print(f"\n{'='*70}")
        print(f"‚úÖ ETL COMPLETADO - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"{'='*70}\n")
        
        return True
        
    except Exception as e:
        print(f"\n‚ùå ERROR EN ETL: {str(e)}")
        return False


print("‚úÖ Pipeline ETL definido")

‚úÖ Pipeline ETL definido


## 6Ô∏è‚É£ Verificaci√≥n de Cr√©ditos API

In [6]:
# Verificar cr√©ditos disponibles
print("üîç Verificando cr√©ditos API...\n")
info_clave = obtener_info_clave()

if info_clave:
    plan = info_clave.get('plan', {})
    usados = plan.get('credits_used', 0)
    totales = plan.get('credit_limit_monthly', 10000)
    disponibles = totales - usados
    
    print(f"üìä Plan: {plan.get('name', 'N/A')}")
    print(f"üìä Cr√©ditos usados: {usados:,}")
    print(f"üìä Cr√©ditos totales: {totales:,}")
    print(f"üìä Disponibles: {disponibles:,}")
    print(f"üìä Porcentaje usado: {(usados/totales)*100:.1f}%")

üîç Verificando cr√©ditos API...

‚úÖ /key/info | Cr√©ditos: 0
üìä Plan: N/A
üìä Cr√©ditos usados: 0
üìä Cr√©ditos totales: 10,000
üìä Disponibles: 10,000
üìä Porcentaje usado: 0.0%
‚úÖ /key/info | Cr√©ditos: 0
üìä Plan: N/A
üìä Cr√©ditos usados: 0
üìä Cr√©ditos totales: 10,000
üìä Disponibles: 10,000
üìä Porcentaje usado: 0.0%


## 7Ô∏è‚É£ Ejecutar ETL (Una Vez)

Ejecuta el pipeline completo una sola vez

In [7]:
# Ejecutar pipeline ETL
ejecutar_etl_completo()


üöÄ INICIO ETL - 2025-10-24 20:23:16

üì• 1/3 Extrayendo datos de la API...
‚úÖ /cryptocurrency/listings/latest | Cr√©ditos: 1
‚úÖ /cryptocurrency/listings/latest | Cr√©ditos: 1
‚úÖ /global-metrics/quotes/latest | Cr√©ditos: 1
‚úÖ /cryptocurrency/info | Cr√©ditos: 1

‚öôÔ∏è 2/3 Transformando datos...
‚úÖ listings: 100 registros
‚úÖ global_metrics: 1 registros
‚úÖ metadata: 3 registros

üíæ 3/3 Cargando a SQLite...
‚úÖ /global-metrics/quotes/latest | Cr√©ditos: 1
‚úÖ /cryptocurrency/info | Cr√©ditos: 1

‚öôÔ∏è 2/3 Transformando datos...
‚úÖ listings: 100 registros
‚úÖ global_metrics: 1 registros
‚úÖ metadata: 3 registros

üíæ 3/3 Cargando a SQLite...
‚úÖ crypto_listings: 100 registros guardados
‚úÖ global_metrics: 1 registros guardados
‚úÖ crypto_metadata: 3 registros guardados

‚úÖ ETL COMPLETADO - 2025-10-24 20:23:18

‚úÖ crypto_listings: 100 registros guardados
‚úÖ global_metrics: 1 registros guardados
‚úÖ crypto_metadata: 3 registros guardados

‚úÖ ETL COMPLETADO - 2025-10-24 2

True

## 8Ô∏è‚É£ Verificar Datos en SQLite

Comprobamos que los datos se guardaron correctamente

In [8]:
print("üîç Verificando datos en SQLite...\n")

# Verificar crypto_listings
df_check = leer_desde_sqlite('crypto_listings', limit=5)
if df_check is not None:
    print(f"\nüìä crypto_listings ({len(df_check)} primeros registros):")
    display(df_check[['name', 'symbol', 'price', 'market_cap', 'percent_change_24h']])

# Verificar global_metrics
df_global_check = leer_desde_sqlite('global_metrics')
if df_global_check is not None:
    print(f"\nüåç global_metrics:")
    display(df_global_check[['total_market_cap', 'total_volume_24h', 'btc_dominance']])

üîç Verificando datos en SQLite...


üìä crypto_listings (5 primeros registros):


Unnamed: 0,name,symbol,price,market_cap,percent_change_24h
0,Bitcoin,BTC,110556.760531,2204425000000.0,-0.284796
1,Ethereum,ETH,3910.006166,471931100000.0,0.304552
2,Tether USDt,USDT,1.000286,182942700000.0,0.003325
3,BNB,BNB,1108.415628,154269200000.0,-1.553071
4,XRP,XRP,2.478877,148775400000.0,2.736704



üåç global_metrics:


Unnamed: 0,total_market_cap,total_volume_24h,btc_dominance
0,3724456000000.0,148699400000.0,59.181543


## 9Ô∏è‚É£ Consultas SQL de Ejemplo

Ejemplos de an√°lisis con los datos almacenados

In [9]:
# Top 10 por market cap
query_top10 = """
SELECT name, symbol, 
       ROUND(price, 2) as price_usd,
       ROUND(market_cap/1000000000, 2) as mcap_billions,
       ROUND(percent_change_24h, 2) as change_24h
FROM crypto_listings
ORDER BY market_cap DESC
LIMIT 10
"""

resultado = consulta_sqlite(query_top10)
if resultado is not None:
    print("\nüëë Top 10 Criptomonedas por Market Cap:\n")
    display(resultado)


üëë Top 10 Criptomonedas por Market Cap:



Unnamed: 0,name,symbol,price_usd,mcap_billions,change_24h
0,Bitcoin,BTC,110556.76,2204.43,-0.28
1,Ethereum,ETH,3910.01,471.93,0.3
2,Tether USDt,USDT,1.0,182.94,0.0
3,BNB,BNB,1108.42,154.27,-1.55
4,XRP,XRP,2.48,148.78,2.74
5,Solana,SOL,191.77,105.37,-0.08
6,USDC,USDC,1.0,76.29,0.01
7,Dogecoin,DOGE,0.2,29.69,-0.3
8,TRON,TRX,0.3,28.8,-3.68
9,Cardano,ADA,0.65,23.29,-0.05


In [10]:
# Mayores ganancias 24h
query_gainers = """
SELECT name, symbol,
       ROUND(percent_change_24h, 2) as gain_24h,
       ROUND(price, 4) as price_usd,
       market_cap_category
FROM crypto_listings
WHERE percent_change_24h > 0
ORDER BY percent_change_24h DESC
LIMIT 5
"""

resultado_gainers = consulta_sqlite(query_gainers)
if resultado_gainers is not None:
    print("\nüìà Top 5 Mayores Ganancias 24h:\n")
    display(resultado_gainers)


üìà Top 5 Mayores Ganancias 24h:



Unnamed: 0,name,symbol,gain_24h,price_usd,market_cap_category
0,MYX Finance,MYX,21.54,3.2706,Mid-cap
1,Virtuals Protocol,VIRTUAL,14.63,0.9035,Mid-cap
2,Zcash,ZEC,6.99,255.4646,Large-cap
3,Pump.fun,PUMP,6.52,0.0041,Large-cap
4,Artificial Superintelligence Alliance,FET,5.05,0.2637,Mid-cap


## üîü Automatizaci√≥n (Opcional)

Ejecuta el ETL autom√°ticamente cada N horas

‚ö†Ô∏è **Nota:** Requiere `pip install schedule` y el notebook debe estar ejecut√°ndose

In [11]:
import schedule

def iniciar_automatizacion(intervalo_horas=12):
    """
    Inicia automatizaci√≥n del ETL cada X horas.
    
    Args:
        intervalo_horas: Horas entre ejecuciones (default: 12)
    """
    print(f"‚è∞ Automatizaci√≥n iniciada: ETL cada {intervalo_horas} horas")
    print(f"   Primera ejecuci√≥n: AHORA")
    print(f"   Pr√≥xima: en {intervalo_horas} horas\n")
    
    # Ejecutar inmediatamente
    ejecutar_etl_completo()
    
    # Programar ejecuciones
    schedule.every(intervalo_horas).hours.do(ejecutar_etl_completo)
    
    # Loop infinito
    try:
        while True:
            schedule.run_pending()
            time.sleep(60)
    except KeyboardInterrupt:
        print("\n‚èπÔ∏è Automatizaci√≥n detenida")

# Descomenta para activar automatizaci√≥n
# iniciar_automatizacion(intervalo_horas=12)

---

## ? C√≥mo Ejecutar el Proyecto

### **Requisitos previos:**
1. Python 3.11+
2. Archivo `.env` con tu API Key de CoinMarketCap:
   ```
   CMC_API_KEY=tu_clave_aqui
   ```

### **Instalaci√≥n:**
```bash
pip install requests pandas python-dotenv schedule
```

### **Ejecuci√≥n:**
1. **Opci√≥n 1 - Ejecutar todas las celdas**: 
   - `Run All` en Jupyter
   - El ETL se ejecuta autom√°ticamente al llegar a la celda 7

2. **Opci√≥n 2 - Manual**:
   - Ejecuta celdas 1-6 (definiciones)
   - Ejecuta celda 7 cuando quieras actualizar datos

3. **Opci√≥n 3 - Automatizado**:
   - Descomenta √∫ltima l√≠nea de celda 10
   - El ETL se ejecutar√° cada 12 horas autom√°ticamente


### **Verificaci√≥n:****Portfolio:** Ingenier√≠a de Datos

- Revisa que se crea `coinmarketcap_etl.db` en el directorio

- Ejecuta celdas 8-9 para ver los datos**Autor:** Mario Soriano Ba√±uls



------



## üìä Resultados- Los endpoints premium son para casos de uso comerciales

- Demuestro las habilidades t√©cnicas con los endpoints gratuitos

**Base de datos creada:** `coinmarketcap_etl.db`- El plan gratuito (10k cr√©ditos/mes) es suficiente para este proyecto

**¬øPor qu√© no los uso?**

**Tablas disponibles:**

- `crypto_listings` - Top 100 criptomonedas (precio, market cap, volumen, cambios %)- `/v1/cryptocurrency/trending/gainers-losers` - Top gainers/losers detallado

- `global_metrics` - M√©tricas agregadas del mercado- `/v1/cryptocurrency/trending/latest` - Trending cryptos

- `crypto_metadata` - Informaci√≥n detallada (URLs, logos, categor√≠as)- `/v1/cryptocurrency/airdrops` - Informaci√≥n de airdrops

**Enterprise Features:**

**Siguiente paso:** Ver notebook `Analisis_ETL_cmc.ipynb` para an√°lisis avanzados

- `/v1/cryptocurrency/price-performance-stats` - Estad√≠sticas de performance

---- `/v1/cryptocurrency/ohlcv/historical` - Candlestick data (OHLCV)

**Advanced Analytics:**

## üîó Stack T√©cnico

- `/v1/exchange/market-quotes/latest` - Order book y profundidad

- **Python 3.11+**- `/v2/cryptocurrency/quotes/historical` - Datos hist√≥ricos detallados

- **pandas** - Manipulaci√≥n de datos- `/v1/cryptocurrency/market-pairs/latest` - Pares de trading en exchanges

- **requests** - Llamadas HTTP**Trading & Order Book:**

- **sqlite3** - Base de datos

- **python-dotenv** - Gesti√≥n de credencialesEstos endpoints requieren planes pagos de CoinMarketCap:

- **schedule** - Automatizaci√≥n (opcional)

## üíé Endpoints Premium (No implementados)

---