# TP1 Extracción y almacenamiento de datos

### CoinGecko API
CoinGecko API es una interfaz REST pública que permite acceder a información básica de criptomonedas. CoinGecko ofrece más de 70 endpoints públicos REST en JSON, abarcando datos de precios en tiempo real, estadísticas de mercado, metadatos y series históricas de miles de criptomonedas y exchanges.
La Api es publica y no necesita API keys para usarse.

# Librerias

In [1]:
# Instalación de dependencias para Google Colab
!pip install deltalake
!pip install pyarrow
!pip install pandas
!pip install requests

Collecting deltalake
  Downloading deltalake-1.0.2-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.6 kB)
Collecting arro3-core>=0.5.0 (from deltalake)
  Downloading arro3_core-0.5.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (949 bytes)
Collecting deprecated>=1.2.18 (from deltalake)
  Downloading Deprecated-1.2.18-py2.py3-none-any.whl.metadata (5.7 kB)
Downloading deltalake-1.0.2-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (51.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.3/51.3 MB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading arro3_core-0.5.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m37.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Deprecated-1.2.18-py2.py3-none-any.whl (10.0 kB)
Installing collected packages: deprecated, arro3-core, deltalake
Successfully installed arro3-core-

In [1]:
import os, time, requests
import pandas as pd
from deltalake import write_deltalake, DeltaTable
from datetime import datetime
import numpy as np

# Configuración de endpoints

In [2]:
# Configuración de endpoints
BASE_URL = "https://api.coingecko.com/api/v3"
MARKETS_URL = f"{BASE_URL}/coins/markets"
LIST_URL = f"{BASE_URL}/coins/list"

# Estructura del Data Lake - Capa Bronze
BRONZE_PATH = "bronze/coingecko_api"

# Funciones de extracción

In [3]:
def ensure_directory_exists(path):
    """
    Asegura que el directorio existe, si no lo crea automáticamente
    """
    try:
        if not os.path.exists(path):
            os.makedirs(path, exist_ok=True)
            print(f"✓ Directorio creado: {path}")
        else:
            print(f"✓ Directorio ya existe: {path}")
    except Exception as e:
        print(f"Error creando directorio {path}: {e}")
        raise


def is_valid_delta_table(path):
    """
    Verifica si un path contiene una tabla Delta válida
    """
    try:
        DeltaTable(path)
        return True
    except Exception:
        return False


#Extracción full; endpoint para datos estáticos: lista todas las criptomonedas con id, symbol y name.
def fetch_static_coins():
    """
    Extrae la lista de criptomonedas de CoinGecko (datos estáticos/metadatos)
    Devuelve un dataframe con id, symbol, name
    retorna:
        pd.DataFrame: DataFrame con la lista de criptomonedas
    """
    print("🔄 Extrayendo datos estáticos: lista de criptomonedas...")
    try:
        resp = requests.get(LIST_URL, timeout=30)
        resp.raise_for_status()
        df = pd.DataFrame(resp.json())

        # Agregar timestamp de extracción para auditoría
        df["extraction_timestamp"] = datetime.utcnow().isoformat()

        print(f"✅ Extracción exitosa: {len(df)} monedas obtenidas")
        return df

    except requests.exceptions.RequestException as e:
        print(f"❌ Error en la petición: {e}")
        raise
    except Exception as e:
        print(f"❌ Error procesando datos estáticos: {e}")
        raise

# Endpoint para datos temporales: Información actualizada sobre la cotizacion de distintas criptomonedas (current_price, market_cap, total_volume y price_change_percentage_24h, etc)
# los datos del endpoint se actualiza todos los dias.
def fetch_market_data(vs_currency="usd", number_of_pages=2, per_page=25):
    """
    Extrae datos actuales de mercado desde CoinGecko (datos temporales)
    Parámetros:
        vs_currency (str): la moneda de comparación (p.ej. "usd")
        per_page (int): cantidad de ítems por página (máx. 250)
        number_of_pages (int): cantidad de páginas a extraer
    retorna:
        pd.DataFrame: DataFrame con datos de mercado de criptomonedas
    """
    print(f"🔄 Extrayendo datos temporales: mercado de criptomonedas...")
    print(f"   📊 Configuración: {number_of_pages} páginas × {per_page} items = {number_of_pages * per_page} registros máx.")
    
    # Acumulador de datos
    all_data = []

    try:
        for page in range(1, number_of_pages + 1):
            print(f"   📄 Procesando página {page}/{number_of_pages}...")

            params = {
                "vs_currency": vs_currency,
                "order": "market_cap_desc",
                "per_page": per_page,
                "page": page,
                "sparkline": "false"
            }

            resp = requests.get(MARKETS_URL, params=params, timeout=30)
            resp.raise_for_status()
            data = resp.json()

            if not data:
                print(f"   ⚠️  Página {page} vacía, finalizando extracción")
                break

            all_data.extend(data)
            print(f"   ✅ Página {page}: {len(data)} registros obtenidos")

            # Rate limiting 
            if page < number_of_pages:
                time.sleep(3)

        if not all_data:
            raise ValueError("No se obtuvieron datos de la API")

        df = pd.json_normalize(all_data)

        # Agregar columnas de partición 
        now = datetime.utcnow()
        df["extract_date"] = now.strftime("%Y-%m-%d")
        df["extraction_timestamp"] = now.isoformat()

        print(f"✅ Extracción completada: {len(df)} registros totales")
        return df

    except requests.exceptions.RequestException as e:
        print(f"❌ Error en la petición: {e}")
        raise
    except Exception as e:
        print(f"❌ Error procesando datos de mercado: {e}")
        raise



# Endpoint para datos históricos: Extrae datos de una criptomoneda específica en un rango de días determinado desde la fecha actual. Utilizare estos datos en la capa gold.
def fetch_historical_chart(coin_id, vs_currency="usd", days=30, interval="daily"):
    """
    Extrae datos de una criptomoneda específica en un rango de días determinado desde la fecha actual,
    Parámetros:
    - coin_id: ID de la criptomoneda (ej. 'bitcoin', 'ethereum').
    - vs_currency: Moneda contra la que se comparan los precios (ej. 'usd', 'eur').
    - days: Número de días a partir desde la fecha actual para obtener los datos históricos, ej 30 para los datos de los últimos 30 días.
    Retorna: un DataFrame de pandas con los datos históricos de precios, columnas: ['prices', 'market_caps', 'total_volumes'] cada columna es una lista de listas con dos elementos: [timestamp, valor].
    """
    url = f"{BASE_URL}/coins/{coin_id}/market_chart"
    params = {
        "vs_currency": vs_currency,
        "days": days,
        "interval": interval
    }
    resp = requests.get(url, params=params, timeout=30)
    resp.raise_for_status()
    
    df= pd.DataFrame(resp.json())
    now = datetime.utcnow()
    df["extract_date"] = now.strftime("%Y-%m-%d")
    df['coin_id'] = coin_id
    return df


def fetch_multiple_historical_chart(coins,vs_currency="usd", days=30, interval="daily"):
    """
    la función recibe una lista de IDs de criptomonedas y extrae los datos históricos para cada una y los combina en un único DataFrame.
    Parámetros: 
    -coins: lista de IDs de criptomonedas (ej. ['bitcoin', 'ethereum']).
    -vs_currency: Moneda contra la que se comparan los precios (ej. 'usd', 'eur').
    -days: Número de días a partir desde la fecha actual para obtener los datos históricos, ej 30 para los datos de los últimos 30 días.
    -interval: Intervalo de tiempo para los datos históricos (ej. 'daily', 'hourly').
    Retorna: un DataFrame de pandas con los datos históricos de precios, columnas: ['prices', 'market_caps', 'total_volumes'] cada columna es una lista de listas con dos elementos: [timestamp, valor].
    """
    all_coins=[]
    for coin in coins:
        try:
            df_coin= fetch_historical_chart(coin,vs_currency,days,interval)
            all_coins.append(df_coin)
            time.sleep(30) # esto hara que el codigo sea más lento, pero es necesario para evitar errores de rate limiting y bloqueos.
        except requests.exceptions.RequestException as e:
            print(f"   ❌ {coin}: Error de API - {e}")
            continue
        except Exception as e:
                print(f"   ❌ {coin}: Error inesperado - {e}")
                continue
    df_all_coins= pd.concat(all_coins, axis=0, ignore_index=True)
    print(f"✅ Extracción completada:")
    print(f"   📊 Total registros unificados: {len(df_all_coins)}")
    return(df_all_coins)



# Funciones almacenamiento en capa bronze

In [4]:
def save_static_data_bronze(df_static):
    """
    Guarda datos estáticos en la capa Bronze
    Estructura: bronze/coingecko_api/coins_list/
    """
    #si el directorio no existe, lo crea
    path = f"{BRONZE_PATH}/coins_list"
    ensure_directory_exists(path)

    print(f"💾 Guardando datos estáticos en: {path}")
    try:
        write_deltalake(
            path,
            df_static,
            mode="overwrite"
        )
        print(f"✅ Datos estáticos guardados exitosamente: {len(df_static)} registros")

        # Verificar que se guardó correctamente
        if os.path.exists(path):
            print(f"✅ Verificación: Directorio Delta creado correctamente")

    except Exception as e:
        print(f"❌ Error guardando datos estáticos: {e}")
        raise

def save_market_data_bronze(df_market):
    """
    Guarda datos temporales en la capa Bronze con estrategia de merge
    Estructura: bronze/coingecko_api/coins_markets/extract_date=YYYY-MM-DD/

    Estrategia de merge:
    - Merge key: id + extract_date (una versión por moneda por día)
    - Si existe: actualiza el registro (útil para re-ejecuciones)
    - Si no existe: inserta nuevo registro
    - Mantiene historial completo de todas las fechas
    """
    path = f"{BRONZE_PATH}/coins_markets"
    # si el directorio no existe, lo crea
    ensure_directory_exists(path)

    extract_date = df_market["extract_date"].iloc[0]
    print(f"💾 Guardando datos de mercado en: {path}")

    try:
        # Verificar si la tabla Delta ya existe
        if is_valid_delta_table(path):
            print(f"   📋 Tabla Delta existente detectada, ejecutando merge...")

            # Cargar la tabla Delta existente
            dt = DeltaTable(path)

            # Ejecutar merge usando id + extract_date como clave
            (
                dt.merge(
                    df_market,
                    predicate="target.id = source.id AND target.extract_date = source.extract_date",
                    source_alias="source",
                    target_alias="target"
                )
                .when_matched_update_all()  # Actualiza todos los campos si encuentra match
                .when_not_matched_insert_all()  # Inserta si no encuentra match
                .execute()
            )

            print(f"✅ Merge completado exitosamente")
        # si la tabla no existe, se crea una nueva
        else:
            print(f"   🆕 Primera ejecución, creando tabla Delta...")
            # Primera vez, crear la tabla con overwrite
            write_deltalake(
                path,
                df_market,
                mode="overwrite",
                partition_by=["extract_date"]
            )
            print(f"✅ Tabla Delta creada por primera vez: {len(df_market)} registros")

        # Verificar partición creada
        partition_path = f"{path}/extract_date={extract_date}"
        if os.path.exists(partition_path):
            print(f"✅ Verificación: Partición creada/actualizada en {partition_path}")

        # Información final
        print(f"   🎯 Registros procesados en esta ejecución: {len(df_market)}")

    except Exception as e:
        print(f"❌ Error en merge de datos de mercado: {e}")
        print(f"   💡 Tip: Verifica que la tabla Delta no esté corrupta")
        raise



# los datos crudos no tienen columna date, cada columna tiene una lista [timestamp_ms,valor], asi que en la capa bronze usaré un overwrite partitionando por coin_id y extract_date,
# en la capa silver con los datos mas limpios y transformados usare una mejor estrategia de guardado con un merge incremental.
def save_multiple_market_chart_bronze(df_chart):
    """
    Guarda datos de market chart en la capa Bronze con estrategia incremental
    Estructura: bronze/coingecko_api/market_chart/extract_date=YYYY-MM-DD/

    Estrategia:
    - Particionado por coin_id y extract_date

    PROS:
    - Simple de implementar
    - Mantiene trazabilidad de cuándo se extrajo cada lote
    
    CONTRAS:
    - Duplicación masiva de datos (30 días × N extracciones)
    - Consume mucho storage
    
    En la capa silver trataremos estos contras.
    """
    path = f"{BRONZE_PATH}/market_chart"
    ensure_directory_exists(path)

    extract_date = df_chart["extract_date"].iloc[0]
    print(f"💾 Guardando datos de market chart en: {path}")
    try:
        # Verificar si ya existe la tabla
        if is_valid_delta_table(path):
            # Cargar la tabla Delta existente
            dt = DeltaTable(path)

            # Ejecutar merge usando id + extract_date como clave
            (
                dt.merge(
                    df_chart,
                    predicate="target.coin_id = source.coin_id AND target.extract_date = source.extract_date",
                    source_alias="source",
                    target_alias="target"
                )
                .when_matched_update_all()  # Actualiza todos los campos si encuentra match
                .when_not_matched_insert_all()  # Inserta si no encuentra match
                .execute()
            )

            print(f"✅ Merge completado exitosamente")
        else:
            # Primera vez, crear la tabla
            write_deltalake(
                path,
                df_chart,
                mode="overwrite",
                partition_by=["extract_date"]
            )
            print(f"✅ Tabla Delta creada por primera vez")

        # Verificar partición creada
        partition_path = f"{path}/extract_date={extract_date}"
        if os.path.exists(partition_path):
            print(f"✅ Verificación: Partición creada en {partition_path}")

    except Exception as e:
        print(f"❌ Error guardando datos de market chart: {e}")
        raise


# Estas funcion no tienen bloque try/except o menejo de errores porque las funciones que utilzan ya tienen manejo de errores incorporado.
def full_extraction():
    """
    Extracción y guardado completa de datos estáticos (metadatos)
    """
    print("\n=== EXTRACCIÓN FULL - DATOS ESTÁTICOS ===")
    df_static = fetch_static_coins()
    save_static_data_bronze(df_static)
    return df_static

def incremental_extraction(vs_currency="usd", number_of_pages=2, per_page=25):
    """
    Extracción incremental de datos temporales (mercado)
    Se ejecuta diariamente para obtener datos actualizados del mercado
    """
    print("\n=== EXTRACCIÓN INCREMENTAL - DATOS TEMPORALES ===")
    df_market = fetch_market_data(vs_currency, number_of_pages, per_page)
    save_market_data_bronze(df_market)
    return df_market

def multiple_historical_chart_extraction(coins, vs_currency="usd", days=30, interval="daily"):
    """
    Extracción incremental de datos historicos de múltiples criptomonedas.
    """
    print("\n=== EXTRACCIÓN INCREMENTAL - DATOS TEMPORALES ===")
    df_market_chart = fetch_multiple_historical_chart(coins, vs_currency, days, interval)
    save_multiple_market_chart_bronze(df_market_chart)
    return df_market_chart

# main()

In [5]:

def main(vs_currency="usd", number_of_pages=2):
    """
    Función principal que ejecuta las extracciones y la carga de datos en la capa Bronze
    Parámetros:
        vs_currency (str): Moneda de comparación para datos de mercado (default: "usd")
        number_of_pages (int): Número de páginas a extraer para datos de mercado (default: 2
    """
    print("🚀 Iniciando proceso de extracción de datos - CoinGecko API")
    print("📁 Estructura del Data Lake: bronze/coingecko_api/")
    print("🌐 Ejecutándose en Google Colab")
    print("-" * 60)

    try:
        # Extracción full de metadatos
        df_static = full_extraction()

        print("-" * 60)

        # Extracción incremental de datos de mercado
        df_market = incremental_extraction(vs_currency,number_of_pages)

        # Para el presenta trabajo vamos a trabajar con los datos históricos de 5 criptomonedas más relevantes.
        
        top_coins=["bitcoin", "ethereum", "tether", "ripple","binancecoin"]
        
        df_multiple_historical_chart=multiple_historical_chart_extraction(top_coins) # usaremos los parametros por defecto, vs_currency="usd", days=30, interval="daily"
        
        
        print("-" * 60)
        print(f"📊 RESUMEN FINAL")
        print(f"   📈 Datos estáticos extraídos: {len(df_static)} monedas")
        print(f"   💰 Datos de mercado extraídos: {len(df_market)} registros")
        print(f"   💾 Formato: Delta Lake - Capa Bronze")
        print(f"   📂 Ubicación: ./bronze/coingecko_api/")
        print("✅ Proceso completado exitosamente!")

        return df_static, df_market, df_multiple_historical_chart

    except Exception as e:
        print(f"❌ Error en el proceso principal: {e}")
        print("💡 Tip: Verifica tu conexión a internet y que la API esté disponible")
        raise

In [7]:
# Ejecutar el proceso
# el script tardara 2 min 30 seg aproximadamente en ejecutarse debido a los tiempos de espera entre las peticiones a la API para evitar bloqueos por rate limiting.
if __name__ == "__main__":
    df_static, df_market,df_multiple_historical_chart = main(vs_currency="usd", number_of_pages=2)

    # Mostrar muestra de los datos para verificación
    print("\n" + "="*60)
    print("📋 MUESTRA DE DATOS EXTRAÍDOS")
    print("="*60)
    print("\n🏷️  DATOS ESTÁTICOS (primeras 5 filas):")
    print(df_static[['id', 'symbol', 'name']].head())

    print(f"\n💰 DATOS DE MERCADO (primeras 5 filas, {len(df_market.columns)} columnas totales):")
    print(df_market[['id', 'symbol', 'name', 'current_price', 'market_cap', 'extract_date']].head())


🚀 Iniciando proceso de extracción de datos - CoinGecko API
📁 Estructura del Data Lake: bronze/coingecko_api/
🌐 Ejecutándose en Google Colab
------------------------------------------------------------

=== EXTRACCIÓN FULL - DATOS ESTÁTICOS ===
🔄 Extrayendo datos estáticos: lista de criptomonedas...


  df["extraction_timestamp"] = datetime.utcnow().isoformat()


✅ Extracción exitosa: 17507 monedas obtenidas
✓ Directorio ya existe: bronze/coingecko_api/coins_list
💾 Guardando datos estáticos en: bronze/coingecko_api/coins_list
✅ Datos estáticos guardados exitosamente: 17507 registros
✅ Verificación: Directorio Delta creado correctamente
------------------------------------------------------------

=== EXTRACCIÓN INCREMENTAL - DATOS TEMPORALES ===
🔄 Extrayendo datos temporales: mercado de criptomonedas...
   📊 Configuración: 2 páginas × 25 items = 50 registros máx.
   📄 Procesando página 1/2...
   ✅ Página 1: 25 registros obtenidos
   📄 Procesando página 2/2...
   ✅ Página 2: 25 registros obtenidos
✅ Extracción completada: 50 registros totales
✓ Directorio ya existe: bronze/coingecko_api/coins_markets
💾 Guardando datos de mercado en: bronze/coingecko_api/coins_markets
   📋 Tabla Delta existente detectada, ejecutando merge...
✅ Merge completado exitosamente
✅ Verificación: Partición creada/actualizada en bronze/coingecko_api/coins_markets/extract_

  now = datetime.utcnow()
  now = datetime.utcnow()
  now = datetime.utcnow()
  now = datetime.utcnow()
  now = datetime.utcnow()
  now = datetime.utcnow()


✅ Extracción completada:
   📊 Total registros unificados: 155
✓ Directorio ya existe: bronze/coingecko_api/market_chart
💾 Guardando datos de market chart en: bronze/coingecko_api/market_chart
✅ Merge completado exitosamente
✅ Verificación: Partición creada en bronze/coingecko_api/market_chart/extract_date=2025-06-21
------------------------------------------------------------
📊 RESUMEN FINAL
   📈 Datos estáticos extraídos: 17507 monedas
   💰 Datos de mercado extraídos: 50 registros
   💾 Formato: Delta Lake - Capa Bronze
   📂 Ubicación: ./bronze/coingecko_api/
✅ Proceso completado exitosamente!

📋 MUESTRA DE DATOS EXTRAÍDOS

🏷️  DATOS ESTÁTICOS (primeras 5 filas):
            id symbol          name
0            _    gib    ༼ つ ◕_◕ ༽つ
1  000-capital    000   000 Capital
2       01coin    zoc        01coin
3       0chain    zcn           Zus
4         0dog   0dog  Bitcoin Dogs

💰 DATOS DE MERCADO (primeras 5 filas, 31 columnas totales):
            id symbol      name  current_price     