# ETL CoinGecko

In [10]:
import os, io, time, json
from datetime import datetime, timezone
from typing import List, Dict, Any

import requests
import pandas as pd
import boto3
from botocore.exceptions import ClientError
from dotenv import load_dotenv

from pathlib import Path
import pyarrow 

## Extracción

- Consulta a la API pública de CoinGecko para obtener el top 50 de criptomonedas por capitalización

In [5]:

COINGECKO_URL = (
    "https://api.coingecko.com/api/v3/coins/markets"
    "?vs_currency=usd&order=market_cap_desc&per_page=50&page=1"
    "&price_change_percentage=1h,24h,7d"
)

def extract_from_api(url, timeout):
    resp = requests.get(url, timeout=timeout)
    resp.raise_for_status()
    data = resp.json()
    if not isinstance(data, list) or len(data) == 0:
        raise ValueError("API devolvió estructura inesperada o vacía.")
    return data

raw_records = extract_from_api(COINGECKO_URL, 30)

raw_records

[{'id': 'bitcoin',
  'symbol': 'btc',
  'name': 'Bitcoin',
  'image': 'https://coin-images.coingecko.com/coins/images/1/large/bitcoin.png?1696501400',
  'current_price': 114313,
  'market_cap': 2277021305051,
  'market_cap_rank': 1,
  'fully_diluted_valuation': 2277021305051,
  'total_volume': 58855014595,
  'high_24h': 114411,
  'low_24h': 111599,
  'price_change_24h': 2255.53,
  'price_change_percentage_24h': 2.01283,
  'market_cap_change_24h': 42437202375,
  'market_cap_change_percentage_24h': 1.89911,
  'circulating_supply': 19928006.0,
  'total_supply': 19928006.0,
  'max_supply': 21000000.0,
  'ath': 124128,
  'ath_change_percentage': -7.9074,
  'ath_date': '2025-08-14T00:37:02.582Z',
  'atl': 67.81,
  'atl_change_percentage': 168480.49184,
  'atl_date': '2013-07-06T00:00:00.000Z',
  'roi': None,
  'last_updated': '2025-09-29T23:20:22.786Z',
  'price_change_percentage_1h_in_currency': 0.13487360215119557,
  'price_change_percentage_24h_in_currency': 2.0128337940805747,
  'price_c

## Transform

- Limpieza y normalización de columnas, tipificación y timestamp de corrida

In [9]:

def transform(records: List[Dict[str, Any]]) -> pd.DataFrame:
    df = pd.DataFrame.from_records(records)
    cols = [
        "id", "symbol", "name",
        "current_price", "market_cap", "total_volume",
        "high_24h", "low_24h",
        "price_change_percentage_1h_in_currency",
        "price_change_percentage_24h_in_currency",
        "price_change_percentage_7d_in_currency",
        "last_updated"
    ]
    keep = [c for c in cols if c in df.columns]
    df = df[keep].copy()
    num_cols = [
        "current_price","market_cap","total_volume","high_24h","low_24h",
        "price_change_percentage_1h_in_currency",
        "price_change_percentage_24h_in_currency",
        "price_change_percentage_7d_in_currency"
    ]
    for c in num_cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    if "last_updated" in df.columns:
        df["last_updated"] = pd.to_datetime(df["last_updated"], errors="coerce")
    run_ts = datetime.now(timezone.utc)
    df["run_ts_utc"] = run_ts
    return df

df = transform(raw_records)
df

Unnamed: 0,id,symbol,name,current_price,market_cap,total_volume,high_24h,low_24h,price_change_percentage_1h_in_currency,price_change_percentage_24h_in_currency,price_change_percentage_7d_in_currency,last_updated,run_ts_utc
0,bitcoin,btc,Bitcoin,114313.0,2277021305051,58855014595,114411.0,111599.0,0.134874,2.012834,1.198553,2025-09-29 23:20:22.786000+00:00,2025-09-29 23:24:10.401645+00:00
1,ethereum,eth,Ethereum,4225.59,509726400726,35437258130,4230.02,4089.64,0.057965,2.412481,0.723453,2025-09-29 23:20:25.315000+00:00,2025-09-29 23:24:10.401645+00:00
2,tether,usdt,Tether,1.0,174454866853,111783934919,1.001,1.0,0.002564,0.022374,-0.016332,2025-09-29 23:20:21.246000+00:00,2025-09-29 23:24:10.401645+00:00
3,ripple,xrp,XRP,2.89,172553781571,4905318747,2.91,2.85,-0.167751,0.84649,1.195363,2025-09-29 23:20:19.767000+00:00,2025-09-29 23:24:10.401645+00:00
4,binancecoin,bnb,BNB,1033.05,143649529111,2127461869,1035.61,988.9,-0.110228,4.458525,4.140284,2025-09-29 23:20:22.739000+00:00,2025-09-29 23:24:10.401645+00:00
5,solana,sol,Solana,213.2,115992141694,6983110189,214.44,205.24,-0.167424,1.551824,-3.401862,2025-09-29 23:20:20.136000+00:00,2025-09-29 23:24:10.401645+00:00
6,usd-coin,usdc,USDC,0.999603,73510517463,12022498063,0.999715,0.999508,-0.005077,-0.00396,-0.010608,2025-09-29 23:20:21.750000+00:00,2025-09-29 23:24:10.401645+00:00
7,staked-ether,steth,Lido Staked Ether,4225.43,36097209192,30494952,4229.7,4089.09,0.086559,2.399561,0.828639,2025-09-29 23:20:25.078000+00:00,2025-09-29 23:24:10.401645+00:00
8,dogecoin,doge,Dogecoin,0.235571,35587724527,2311021191,0.238518,0.229113,-0.409986,-0.323179,-1.866603,2025-09-29 23:20:25.365000+00:00,2025-09-29 23:24:10.401645+00:00
9,tron,trx,TRON,0.337196,31920613545,501350374,0.337473,0.332641,0.121914,0.529023,-0.662565,2025-09-29 23:20:21.046000+00:00,2025-09-29 23:24:10.401645+00:00


## Load

   - Guardado local en **Parquet** (en carpeta `./data/`).
   - Subida a **Amazon S3** con `boto3` (reemplaza por tu bucket, región y credenciales).

### PARQUET

In [11]:
os.makedirs("data", exist_ok=True)

run_dt = datetime.now(timezone.utc)
date_part = run_dt.strftime("%Y/%m/%d")
ts_part = run_dt.strftime("%Y%m%dT%H%M%SZ")

local_dir = Path("data") / "crypto_markets" / f"dt={date_part}"
local_dir.mkdir(parents=True, exist_ok=True)
local_path = local_dir / f"coingecko_top50_{ts_part}.parquet"

df.to_parquet(local_path, index=False)
local_path

PosixPath('data/crypto_markets/dt=2025/09/29/coingecko_top50_20250929T232554Z.parquet')

### S3

In [14]:
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
S3_BUCKET = os.getenv("S3_BUCKET", "coingecko-prefect")
S3_PREFIX = os.getenv("S3_PREFIX", "etl/crypto_markets")

def upload_file_to_s3(local_path: str, bucket: str, key: str, region: str | None = None) -> str:
    session_kwargs = {}
    if region:
        session_kwargs["region_name"] = region
    session = boto3.Session(**session_kwargs)
    s3 = session.client("s3")
    s3.upload_file(Filename=str(local_path), Bucket=bucket, Key=key)
    return f"s3://{bucket}/{key}"

s3_key = f"{S3_PREFIX}/dt={date_part}/coingecko_top50_{ts_part}.parquet"
try:
    s3_uri = upload_file_to_s3(local_path, S3_BUCKET, s3_key, AWS_REGION)
    print("Subido a:", s3_uri)
except ClientError as e:
    print("Error al subir a S3:", e)

Subido a: s3://coingecko-prefect/etl/crypto_markets/dt=2025/09/29/coingecko_top50_20250929T232554Z.parquet
