In [3]:
%pip install --upgrade pip 
%pip install pandas pyarrow fastparquet kaggle

Collecting pip
  Using cached pip-25.3-py3-none-any.whl.metadata (4.7 kB)
Using cached pip-25.3-py3-none-any.whl (1.8 MB)
Collecting pip
  Using cached pip-25.3-py3-none-any.whl.metadata (4.7 kB)
Using cached pip-25.3-py3-none-any.whl (1.8 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 25.2
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 25.2
    Uninstalling pip-25.2:
    Uninstalling pip-25.2:
      Successfully uninstalled pip-25.2
      Successfully uninstalled pip-25.2
Successfully installed pip-25.3
Successfully installed pip-25.3
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Collecting fastparquet
Collecting fastparquet
  Downloading fastparquet-2024.11.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.2 kB)
  Downloading fastparquet-2024.11.0-cp313-cp313-manylinux

In [6]:
import pandas as pd
import json, re, os

In [None]:
from kaggle.api.kaggle_api_extended import KaggleApi

In [7]:
data_dir = "../data"
kaggle = {
    "title": "Peer-to-Peer Boliviano (BOB) Exchange Data",
    "subtitle": "Github Actions ETL Pipeline",
    "description": "This project contains the ETL pipeline for the Peer-to-Peer Boliviano (BOB) Exchange Data. The data is collected from various sources and transformed into a clean format for analysis. \nThe pipeline includes data extraction, transformation, and loading processes, along with data quality checks.\n",
    "id": "andreschirinos/p2p-bob-exchange",
    "licenses": [
        {
            "name": "CC0-1.0",
            "title": "CC0 1.0",
            "path": "https://creativecommons.org/publicdomain/zero/1.0/",
        }
    ],
    "resources": [
        {
            "path": "advertiser.parquet",
            "description": "Advertiser data from the BOB exchange",
            "schema": {
                "fields": [
                    {
                        "name": "advertiser_userno",
                        "order": 0,
                        "description": "Unique identifier for the advertiser",
                        "type": "string",
                    },
                    {
                        "name": "advertiser_nickname",
                        "order": 1,
                        "description": "Nickname of the advertiser",
                        "type": "string",
                    },
                    {
                        "name": "advertiser_monthordercount",
                        "order": 2,
                        "description": "Number of orders placed by the advertiser in the last month",
                        "type": "number",
                    },
                    {
                        "name": "advertiser_monthfinishrate",
                        "order": 3,
                        "description": "Finish rate of the advertiser in the last month",
                        "type": "number",
                    },
                    {
                        "name": "advertiser_positiverate",
                        "order": 4,
                        "description": "Positive rate of the advertiser",
                        "type": "number",
                    },
                    {
                        "name": "advertiser_usertype",
                        "order": 5,
                        "description": "Type of the advertiser (e.g., user, merchant)",
                        "type": "string",
                    },
                    {
                        "name": "advertiser_usergrade",
                        "order": 6,
                        "description": "Grade of the advertiser",
                        "type": "string",
                    },
                    {
                        "name": "advertiser_u seridentity",
                        "order": 7,
                        "description": "Identity of the advertiser (e.g., MASS_MERCHANT, BLOCK_MERCHANT)",
                        "type": "string",
                    },
                    {
                        "name": "advertiser_viplevel",
                        "order": 8,
                        "description": "VIP level of the advertiser",
                        "type": "number",
                    },
                    {
                        "name": "advertiser_isblocked",
                        "order": 9,
                        "description": "Indicates if the advertiser is blocked",
                        "type": "boolean",
                    },
                    {
                        "name": "advertiser_activetimeinsecond",
                        "order": 10,
                        "description": "Active time of the advertiser in seconds",
                        "type": "number",
                    },
                    {
                        "name": "timestamp",
                        "order": 11,
                        "description": "Timestamp of the data collection",
                        "type": "datetime",
                    },
                    {
                        "name": "source",
                        "order": 12,
                        "description": "Source of the data (e.g, binance)",
                        "type": "string",
                    },
                ]
            },
            "name": "advertiser",
            "profile": "tabular-data-resource",
            "title": "Advertiser Table",
            "format": "format",
            "encoding": "utf-8",
        }
    ],
    "keywords": [
        "p2p",
        "exchange",
        "data",
        "pipeline",
        "etl",
        "bob",
        "bolivia",
        "cryptocurrency",
        "bitcoin",
        "blockchain",
    ],
    "name": "p2p-bob-exchange",
    "homepage": "https://sociest.org",
    "version": "1.0.0",
    "contributors": [{"title": "Andres Chirinos", "role": "author"}],
}

In [8]:
input_file = os.path.join(data_dir, 'raw-data.csv')

In [None]:
api = KaggleApi()
api.authenticate()

In [None]:
print(f"Transformando datos de {input_file}...")
df = pd.read_csv(input_file)

df.columns = [col.strip().lower().replace(' ', '_').replace('.','_') for col in df.columns]

# Advertiser

In [None]:
# Separa las tablas
advertiser_cols = [col for col in df.columns if col.startswith("advertiser") or col in ["timestamp", "source"]]
df_advertiser = df[advertiser_cols]

In [None]:
# Declara formatos
custom_funcs = {
    "advertiser_userno": lambda col: col.astype(str),
    "advertiser_monthordercount": lambda col: col.astype(int),
    "advertiser_monthfinishrate": lambda col: col.astype(float),
    "advertiser_positiverate": lambda col: col.astype(float),
    "advertiser_usertype": lambda col: col.astype("category"),
    "advertiser_usergrade": lambda col: col.astype(int),
    "advertiser_useridentity": lambda col: col.astype("category"),
    "advertiser_badges": lambda col: col.astype(str),
    "advertiser_viplevel": lambda col: col.fillna(0).astype(int),
    "advertiser_isblocked": lambda col: col.astype(bool),
    "advertiser_activetimeinsecond": lambda col: col.fillna(-1).astype(int),
    
    "timestamp": lambda col: pd.to_datetime(col, unit="s"),
    "source": lambda col: col.astype("category"),
}

In [None]:
# Pone los formatos
default_func = lambda col: col

for col in df_advertiser.columns:
    df_advertiser.loc[:, col] = custom_funcs.get(col, default_func)(df_advertiser[col])

In [None]:
df[["adv_advno", "advertiser_userno"]].astype(str).groupby("advertiser_userno").agg(','.join)

In [None]:
df_advertiser["advices"] = df[["adv_advno", "advertiser_userno"]].astype(str).groupby("advertiser_userno")["adv_advno"].transform(lambda x: ','.join(x))

In [None]:
df_advertiser.drop_duplicates(subset=["advertiser_userno"], inplace=True)

In [None]:
df_advertiser.to_parquet(os.path.join(data_dir,'advertiser.parquet'), index=False)
print("advertiser Transformación terminada.")

# Advice

In [None]:
advice_cols = [col for col in df.columns if (not col.startswith("advertiser") or col in ["advertiser_userno"]) and not col in []]
df_advice = df[advice_cols]

In [None]:
# Declara formatos
custom_funcs = {
    "adv_advno": lambda col: col.astype(str),
    "adv_classify": lambda col: col.astype("category"),
    "adv_tradetype": lambda col: col.astype("category"),
    "adv_asset": lambda col: col.astype("category"),
    "adv_fiatunit": lambda col: col.astype("category"),
    "adv_price": lambda col: col.astype(float),
    "adv_surplusamount": lambda col: col.astype(float),
    "adv_tradablequantity": lambda col: col.astype(float),
    "adv_maxsingletransamount": lambda col: col.astype(float),
    "adv_minsingletransamount": lambda col: col.astype(float),
    "adv_paytimelimit": lambda col: col.astype(int),
    "adv_takeradditionalkycrequired": lambda col: col.astype(bool),
    "adv_assetscale": lambda col: col.astype(int),
    "adv_fiatscale": lambda col: col.astype(int),
    "adv_pricescale": lambda col: col.astype(int),
    "adv_fiatsymbol": lambda col: col.astype("category"),
    "adv_istradable": lambda col: col.astype(bool),
    "adv_dynamicmaxsingletransamount": lambda col: col.astype(float),
    "adv_minsingletransquantity": lambda col: col.astype(float),
    "adv_maxsingletransquantity": lambda col: col.astype(float),
    "adv_dynamicmaxsingletransquantity": lambda col: col.astype(float),
    "adv_commissionrate": lambda col: col.astype(float),
    "adv_issafepayment": lambda col: col.astype(bool),
    
    "adv_trademethods": lambda col: col.apply(lambda x: ",".join([method['identifier'] for method in eval(x)])),
    
    "advertiser_userno": lambda col: col.astype(str),

    "timestamp": lambda col: pd.to_datetime(col, unit="s"),
    "source": lambda col: col.astype("category"),
}

In [None]:
# Pone los formatos
default_func = lambda col: col

for col in df_advice.columns:
    df_advice.loc[:, col] = custom_funcs.get(col, default_func)(df_advice[col])

In [None]:
df_advice.columns = df_advice.columns.str.replace("^adv_", "", regex=True)

In [None]:
api.dataset_download_file(kaggle["id"], "advice.parquet", path=data_dir, force=True, quiet=False)

In [None]:
df_last_advice = pd.read_parquet(os.path.join(data_dir, "advice.parquet"))
# Reset index to avoid InvalidIndexError during concatenation
df_last_advice = df_last_advice.reset_index(drop=True)
df_advice = df_advice.reset_index(drop=True)
df_advice = pd.concat([df_last_advice, df_advice], ignore_index=True)

In [None]:
df_advice.to_parquet(os.path.join(data_dir,'advice.parquet'), index=False)
print("advice Transformación terminada.")

# Trade Methods

In [None]:
trade_methods_cols = [
    col
    for col in df.columns
    if (col in ["adv_advno", "adv_trademethods"]) and not col in []
]
df_advice_trade_info = df[trade_methods_cols]

In [None]:
df_advice_trade_table = df_advice_trade_info["adv_trademethods"].apply(eval).explode()

df_trade_methods_table = pd.json_normalize(df_advice_trade_table)

df_trade_methods_table["adv_advno"] = df_advice_trade_info.loc[df_advice_trade_table.index, "adv_advno"].values

df_trade_methods_table = df_trade_methods_table.drop_duplicates()

df_trade_methods = df_trade_methods_table.groupby("identifier").agg({
    "adv_advno": set,
    "tradeMethodName": set,
    "tradeMethodShortName": set,
    "tradeMethodBgColor": set
}).reset_index()

df_trade_methods = df_trade_methods.applymap(lambda x: {str(i) for i in x if i is not None} if isinstance(x, set) else x)
df_trade_methods = df_trade_methods.applymap(lambda x: ','.join(x) if isinstance(x, set) else x)

In [None]:
df_trade_methods.to_parquet(os.path.join(data_dir,'trade_methods.parquet'), index=False)
print("trade_methods Transformación terminada.")

# Dashboard Summary Dataset

Crear un dataset de resumen agregado para optimizar el dashboard.
Este archivo contendrá datos pre-procesados por intervalos de tiempo, reduciendo drásticamente el tamaño y mejorando el rendimiento.

In [9]:
# Cargar datos de advice para crear el resumen
print("Cargando datos de advice para crear resumen...")
df_advice_full = pd.read_parquet(os.path.join(data_dir, 'advice.parquet'))

print(f"Datos cargados: {df_advice_full.shape}")
print(f"Rango de fechas: {df_advice_full['timestamp'].min()} a {df_advice_full['timestamp'].max()}")
print(f"Assets únicos: {df_advice_full['asset'].nunique()}")
print(f"Tipos de trade: {df_advice_full['tradetype'].unique()}")

Cargando datos de advice para crear resumen...
Datos cargados: (4523541, 89)
Rango de fechas: 2024-10-19 16:45:08 a 2025-11-12 14:11:05
Datos cargados: (4523541, 89)
Rango de fechas: 2024-10-19 16:45:08 a 2025-11-12 14:11:05
Assets únicos: 13
Tipos de trade: ['SELL' 'BUY']
Assets únicos: 13
Tipos de trade: ['SELL' 'BUY']


In [None]:
# Función para crear resumen agregado por intervalo de tiempo
def create_summary_dataset(df, freq='5min'):
    """
    Crea un dataset de resumen agregado por intervalo de tiempo.
    
    Parameters:
    - df: DataFrame con los datos completos
    - freq: Frecuencia de agregación (5min, 15min, 1h, 1D, etc.)
    
    Returns:
    - DataFrame con datos agregados
    """
    print(f"Creando resumen con frecuencia: {freq}")
    
    # Asegurar que timestamp es datetime
    df = df.copy()
    if df['timestamp'].dtype != 'datetime64[ns]':
        df['timestamp'] = pd.to_datetime(df['timestamp'])
    
    # Agrupar por timestamp, asset, tradetype y source
    summary = df.groupby([
        pd.Grouper(key='timestamp', freq=freq),
        'asset',
        'tradetype',
        'source'
    ]).agg({
        'price': ['mean', 'min', 'max', 'std', 'count'],
        'tradablequantity': ['sum', 'mean', 'min', 'max'],
        'surplusamount': ['sum', 'mean'],
        'maxsingletransamount': ['mean', 'max'],
        'minsingletransamount': ['mean', 'min'],
        'advno': 'count'  # Número de anuncios
    }).reset_index()
    
    # Aplanar nombres de columnas
    summary.columns = ['_'.join(col).strip('_') if col[1] else col[0] 
                       for col in summary.columns.values]
    
    # Renombrar para mayor claridad
    summary = summary.rename(columns={
        'advno_count': 'num_ads',
        'price_count': 'num_transactions'
    })
    
    return summary

# Crear múltiples resúmenes con diferentes frecuencias
print("\n=== CREANDO RESÚMENES AGREGADOS ===\n")

# Resumen de 5 minutos (para análisis detallado)
summary_5min = create_summary_dataset(df_advice_full, freq='5min')
print(f"Resumen 5min: {summary_5min.shape} (reducción: {len(df_advice_full)/len(summary_5min):.1f}x)")

# Resumen de 1 hora (para análisis general)
summary_1h = create_summary_dataset(df_advice_full, freq='1h')
print(f"Resumen 1h: {summary_1h.shape} (reducción: {len(df_advice_full)/len(summary_1h):.1f}x)")

# Resumen diario (para análisis histórico)
summary_1d = create_summary_dataset(df_advice_full, freq='1D')
print(f"Resumen 1D: {summary_1d.shape} (reducción: {len(df_advice_full)/len(summary_1d):.1f}x)")


=== CREANDO RESÚMENES AGREGADOS ===

Creando resumen con frecuencia: 5min


In [None]:
# Mostrar ejemplo del resumen de 1 hora
print("\n=== EJEMPLO DEL RESUMEN (1 hora) ===")
print(f"\nPrimeras filas:")
print(summary_1h.head())

print(f"\nColumnas del resumen:")
for col in summary_1h.columns:
    print(f"  - {col}")

print(f"\nTipos de datos:")
print(summary_1h.dtypes)

print(f"\nEstadísticas de memoria:")
print(f"  Original: {df_advice_full.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
print(f"  Resumen 5min: {summary_5min.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
print(f"  Resumen 1h: {summary_1h.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
print(f"  Resumen 1D: {summary_1d.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

In [None]:
# Crear un resumen combinado optimizado para el dashboard
# Este incluirá múltiples frecuencias en un solo archivo con un indicador

print("\n=== CREANDO RESUMEN COMBINADO PARA DASHBOARD ===\n")

# Agregar columna de frecuencia a cada resumen
summary_5min['frequency'] = '5min'
summary_1h['frequency'] = '1h'  
summary_1d['frequency'] = '1D'

# Combinar todos los resúmenes
dashboard_summary = pd.concat([
    summary_5min,
    summary_1h,
    summary_1d
], ignore_index=True)

# Convertir a tipos de datos eficientes
dashboard_summary['frequency'] = dashboard_summary['frequency'].astype('category')
dashboard_summary['asset'] = dashboard_summary['asset'].astype('category')
dashboard_summary['tradetype'] = dashboard_summary['tradetype'].astype('category')
dashboard_summary['source'] = dashboard_summary['source'].astype('category')

# Ordenar por timestamp
dashboard_summary = dashboard_summary.sort_values('timestamp').reset_index(drop=True)

print(f"Resumen combinado: {dashboard_summary.shape}")
print(f"Reducción total de tamaño: {len(df_advice_full)/len(dashboard_summary):.1f}x")
print(f"Memoria: {dashboard_summary.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
print(f"\nFrecuencias incluidas: {dashboard_summary['frequency'].value_counts()}")

In [None]:
# Guardar el resumen combinado para el dashboard
output_file = os.path.join(data_dir, 'dashboard_summary.parquet')
dashboard_summary.to_parquet(output_file, index=False, compression='snappy')

print(f"\n✅ Resumen guardado en: {output_file}")
print(f"Tamaño del archivo: {os.path.getsize(output_file) / 1024**2:.2f} MB")

# También guardar solo el resumen de 1 hora (alternativa más ligera)
output_file_1h = os.path.join(data_dir, 'dashboard_summary_1h.parquet')
summary_1h_clean = summary_1h.drop('frequency', axis=1)
summary_1h_clean.to_parquet(output_file_1h, index=False, compression='snappy')

print(f"✅ Resumen 1h guardado en: {output_file_1h}")
print(f"Tamaño del archivo: {os.path.getsize(output_file_1h) / 1024**2:.2f} MB")

print("\n" + "="*60)
print("RESUMEN DE TRANSFORMACIÓN COMPLETADO")
print("="*60)