In [1]:
import pandas as pd
import numpy as np
from collections import defaultdict

In [2]:
DATA_PATH = "/Users/gascalero/Documents/pipeline-rugpull/mlops-stack/airflow/plugins/data/raw"

WETH = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
ETH_ADDRESS  = "0x0000000000000000000000000000000000000000"
DEAD_ADDRESS = "0x000000000000000000000000000000000000dead"

WINDOW     = 6646       # bloques ‚âà 24h de observaci√≥n
BLOCKSTUDY = 13220488   # bloque de corte para labeling
INACTIVITY = 160000     # bloques sin actividad = abandonado

# 1. CARGA DE DATOS

In [14]:
print("Cargando datos...")

df_pools     = pd.read_csv(f"{DATA_PATH}/pool_list_complete.csv")
df_events    = pd.read_csv(f"{DATA_PATH}/eventos_pool_sync_mint_burn.csv")
df_metadata  = pd.read_csv(f"{DATA_PATH}/token_metadata_complete.csv")
df_transfers = pd.read_csv(f"{DATA_PATH}/eventos_transfers_tokens.csv")

# Columnas auxiliares para pools
df_pools['weth_is_token0'] = df_pools['token0'] == WETH
df_pools['weth_is_token1'] = df_pools['token1'] == WETH

print(f"Pools:     {len(df_pools):,}")
print(f"Events:    {len(df_events):,}")
print(f"Metadata:  {len(df_metadata):,}")
print(f"Transfers: {len(df_transfers):,}")

Cargando datos...
Pools:     998
Events:    1,835,994
Metadata:  998
Transfers: 8,717,043


# 2. DEFINICI√ìN DE FUNCIONES

## 2.1 Funciones de features

In [15]:
def get_pool_features(token_address, df_pools, df_events, eval_block):
    pool_info = df_pools[df_pools['token_address'] == token_address].iloc[0]
    pair      = pool_info['pair_address']
    weth_pos  = 0 if pool_info['weth_is_token0'] else 1
    token_pos = 1 - weth_pos
    decimals  = pool_info[f'token{token_pos}_decimals']

    syncs = df_events[
        (df_events['pair_address'] == pair) &
        (df_events['event_type']   == 'sync') &
        (df_events['block_number'] <  eval_block)
    ].sort_values('block_number')

    if len(syncs) < 2:
        return {}

    weth_r   = syncs[f'amount{weth_pos}_or_reserve{weth_pos}_hex'].apply(lambda x: int(x, 16) / 1e18)
    token_r  = syncs[f'amount{token_pos}_or_reserve{token_pos}_hex'].apply(lambda x: int(x, 16) / 10**decimals)

    valid    = (weth_r > 0) & (token_r > 0)
    weth_r   = weth_r[valid].values
    token_r  = token_r[valid].values

    if len(weth_r) < 2:
        return {}

    return {
        'n_syncs'   : len(syncs),
        'WETH'      : weth_r[-1],
        'prices'    : weth_r[-1] / token_r[-1],
        'liquidity' : weth_r[-1] * token_r[-1],
    }

In [16]:
def get_transfer_features(token_address, df_transfers, eval_block):
    t = df_transfers[
        (df_transfers['token_address'] == token_address) &
        (df_transfers['block_number']  <  eval_block)
    ]
    n_unique = len(set(t['from_address'].tolist() + t['to_address'].tolist()))
    return {
        'num_transactions'  : len(t),
        'n_unique_addresses': n_unique,
    }

In [17]:
def get_curve(token_address, df_transfers, eval_block):
    t = df_transfers[
        (df_transfers['token_address'] == token_address) &
        (df_transfers['block_number']  <  eval_block)
    ].sort_values('block_number')

    balances     = defaultdict(float)
    total_supply = 0.0

    for _, row in t.iterrows():
        from_ = row['from_address']
        to_   = row['to_address']
        value = float(row['value'])
        balances[from_] -= value
        balances[to_]   += value
        if from_ == ETH_ADDRESS:
            total_supply += value
            balances[from_] = 0
        if to_ == ETH_ADDRESS:
            total_supply -= value
            balances[to_]  = 0

    if total_supply == 0:
        return {'tx_curve': 1.0}

    hhi = sum(
        (v / total_supply) ** 2
        for addr, v in balances.items()
        if addr not in [ETH_ADDRESS, DEAD_ADDRESS]
    )
    return {'tx_curve': hhi}

In [18]:
def get_lp_features(token_address, eval_block, df_pools, df_events, df_metadata):
    pool_info           = df_pools[df_pools['token_address'] == token_address].iloc[0]
    pair                = pool_info['pair_address']
    pool_creation_block = pool_info['block_number']
    token_creation_block = df_metadata[
        df_metadata['token_address'] == token_address
    ]['token_creation_block'].iloc[0]

    lp = df_events[
        (df_events['pair_address'] == pair) &
        (df_events['block_number'] <  eval_block)
    ]
    return {
        'mints'                : len(lp[lp['event_type'] == 'mint']),
        'burns'                : len(lp[lp['event_type'] == 'burn']),
        'difference_token_pool': pool_creation_block - token_creation_block,
    }

Funci√≥n de agrupaci√≥n para computo de features por token

In [19]:
def compute_features(token_address, df_pools, df_events, df_transfers, df_metadata):
    pool_info        = df_pools[df_pools['token_address'] == token_address].iloc[0]
    pair             = pool_info['pair_address']
    first_sync_block = df_events[
        (df_events['pair_address'] == pair) &
        (df_events['event_type']   == 'sync')
    ]['block_number'].min()

    eval_block = first_sync_block + WINDOW

    features = {}
    features.update(get_pool_features(token_address, df_pools, df_events, eval_block))
    features.update(get_transfer_features(token_address, df_transfers, eval_block))
    features.update(get_curve(token_address, df_transfers, eval_block))
    features.update(get_lp_features(token_address, eval_block, df_pools, df_events, df_metadata))
    return features

## 2.2. Funciones de heur√≠sticas

In [20]:
def compute_drawdown(series):
    running_max = np.maximum.accumulate(series)
    valley_idx  = np.argmax(running_max - series)
    peak_idx    = np.argmax(series[:valley_idx]) if valley_idx > 0 else 0
    peak_val    = series[peak_idx]
    valley_val  = series[valley_idx]
    if peak_val == 0:
        return 0, peak_idx, valley_idx
    return (valley_val - peak_val) / peak_val, peak_idx, valley_idx


In [21]:
def compute_recovery(series, peak_idx, valley_idx):
    peak_val  = series[peak_idx]
    valley_val = series[valley_idx]
    drop = peak_val - valley_val
    if drop == 0:
        return 0
    return (series[-1] - valley_val) / drop


In [22]:
def extract_label_features(token_address, df_pools, df_events, blockstudy):
    pool_info = df_pools[df_pools['token_address'] == token_address].iloc[0]
    pair      = pool_info['pair_address']
    weth_pos  = 0 if pool_info['weth_is_token0'] else 1
    token_pos = 1 - weth_pos
    decimals  = pool_info[f'token{token_pos}_decimals']

    syncs = df_events[
        (df_events['pair_address'] == pair) &
        (df_events['event_type']   == 'sync')
    ].sort_values('block_number')

    if len(syncs) < 5:
        return None

    weth_r  = syncs[f'amount{weth_pos}_or_reserve{weth_pos}_hex'].apply(lambda x: int(x, 16) / 1e18).values
    token_r = syncs[f'amount{token_pos}_or_reserve{token_pos}_hex'].apply(lambda x: int(x, 16) / 10**decimals).values
    blocks  = syncs['block_number'].values

    valid   = (weth_r > 0) & (token_r > 0)
    blocks  = blocks[valid]
    weth_r  = weth_r[valid]
    token_r = token_r[valid]

    if len(blocks) < 5:
        return None

    liquidity = weth_r * token_r
    prices    = weth_r / token_r

    liq_mdd, liq_peak, liq_valley     = compute_drawdown(liquidity)
    liq_rc                             = compute_recovery(liquidity, liq_peak, liq_valley)
    price_mdd, price_peak, price_valley = compute_drawdown(prices)
    price_rc                            = compute_recovery(prices, price_peak, price_valley)

    return {
        'token_address': token_address,
        'pair_address' : pair,
        'inactive'     : int(blockstudy - blocks[-1] > INACTIVITY),
        'late_creation': int(blockstudy - blocks[0]  < INACTIVITY),
        'liq_mdd'      : liq_mdd,
        'liq_rc'       : liq_rc,
        'price_mdd'    : price_mdd,
        'price_rc'     : price_rc,
    }

In [23]:
def assign_labels(df_label_features, inactive_transfers):
    df = df_label_features.copy()
    df['transfer_inactive'] = inactive_transfers
    df['fully_inactive']    = (df['inactive'] == 1) & (df['transfer_inactive'] == 1)

    eligible = df[(df['fully_inactive']) & (df['late_creation'] == 0)]
    records  = []

    # Tipo 1: liquidity stealing
    for token in eligible[(eligible['liq_mdd'] == -1.0) & (eligible['liq_rc'] <= 0.2)].index:
        records.append({'token_address': token, 'pair_address': df.loc[token, 'pair_address'],
                        'label': 1, 'fraud_type': 'liquidity_stealing'})

    # Tipo 2: dumping
    for token in eligible[
        (eligible['liq_mdd'] == 0) &
        (eligible['price_mdd'].between(-1.0, -0.9)) &
        (eligible['price_rc'].between(0, 0.01))
    ].index:
        records.append({'token_address': token, 'pair_address': df.loc[token, 'pair_address'],
                        'label': 1, 'fraud_type': 'dumping'})

    return pd.DataFrame(records)

In [24]:
def build_labels(df_pools, df_events, df_transfers, blockstudy):
    label_features = []
    for token in df_pools['token_address']:
        result = extract_label_features(token, df_pools, df_events, blockstudy)
        if result:
            label_features.append(result)

    df_lf = pd.DataFrame(label_features).set_index('token_address')

    inactive_transfers = (
        blockstudy - df_transfers.groupby('token_address')['block_number'].max() > INACTIVITY
    ).astype(int)

    return assign_labels(df_lf, inactive_transfers)

# 3. GENERACI√ìN DATASET

In [25]:
# 1. Labeling ‚Äî tokens fraudulentos
df_labels = build_labels(df_pools, df_events, df_transfers, BLOCKSTUDY)
print(f"Tokens fraude (label=0): {len(df_labels)}")
print(df_labels['fraud_type'].value_counts())

# 2. Agregar todos los tokens con label=1 por defecto, luego sobrescribir fraudes
df_all_labels = df_pools[['token_address', 'pair_address']].copy()
df_all_labels['label']      = 1
df_all_labels['fraud_type'] = 'none'

fraud_idx = df_all_labels['token_address'].isin(df_labels['token_address'])
df_all_labels.loc[fraud_idx, 'label']      = 0
df_all_labels.loc[fraud_idx, 'fraud_type'] = df_all_labels.loc[fraud_idx, 'token_address'].map(
    df_labels.set_index('token_address')['fraud_type']
)

print(f"\nDistribuci√≥n labels:")
print(df_all_labels['label'].value_counts())

# 3. Features para todos los tokens
feature_list = []
for token in df_all_labels['token_address']:
    try:
        f = compute_features(token, df_pools, df_events, df_transfers, df_metadata)
        if f:
            f['token_address'] = token
            feature_list.append(f)
    except Exception as e:
        print(f"Error en {token}: {e}")

df_features = pd.DataFrame(feature_list).set_index('token_address')

# 4. Dataset final
df_dataset = df_features.join(
    df_all_labels.set_index('token_address')[['label', 'fraud_type']], how='inner'
)

print(f"\nDataset final: {df_dataset.shape}")
print(df_dataset['label'].value_counts())
df_dataset.head()

Tokens fraude (label=0): 673
fraud_type
liquidity_stealing    643
dumping                30
Name: count, dtype: int64

Distribuci√≥n labels:
label
0    673
1    325
Name: count, dtype: int64

Dataset final: (998, 12)
label
0    673
1    325
Name: count, dtype: int64


Unnamed: 0_level_0,n_syncs,WETH,prices,liquidity,num_transactions,n_unique_addresses,tx_curve,mints,burns,difference_token_pool,label,fraud_type
token_address,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
0xebf919584021075d3f7bf3d6cf1c6dc318221eff,9.0,3.7000000000000003e-17,0.001332997,1.0270089999999999e-30,10,7,1.0,1,1,22,0,liquidity_stealing
0x219865b49bea3a1638084cd1e8c6c87e36de308f,7.0,50.93746,8.5525e-08,30337620000.0,39,20,0.414217,6,0,110761,1,none
0xfa235907a2705f3b55b222b29bb4637549fd3d28,13.0,5.355241,5.730951e-12,5004161000000.0,26,12,0.880001,1,0,140,1,none
0xcccdb294b52fc00051bf694fc798efee33bc0358,12.0,6.583061,7.875194e-12,5502937000000.0,13,13,0.702163,1,0,11,0,liquidity_stealing
0x81835d805d3b4baeaf31655ee62fa6b7cafdb599,9.0,1e-18,1.023583e-09,9.769608e-28,11,6,0.999996,5,1,178,0,liquidity_stealing


In [30]:
# Celda 8 ‚Äî Filtrar tokens sin features suficientes a corte de 24 horas desde creaci√≥n del pool. Puede que tengan > 5 syncs, pero luego de 24 horas
df_dataset = df_dataset.dropna(subset=['n_syncs', 'WETH', 'prices', 'liquidity'])

print(f"Dataset final: {df_dataset.shape}")
print(df_dataset['label'].value_counts())

Dataset final: (975, 12)
label
0    668
1    307
Name: count, dtype: int64


In [31]:
df_dataset.to_csv("/Users/gascalero/Documents/pipeline-rugpull/pipeline/data/labeled/dataset_labeled.csv")

# 4. MLFLOW

In [None]:
import mlflow
import mlflow.xgboost
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, classification_report

mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("rug-pull-detection")

In [59]:
FEATURES = ['n_syncs', 'WETH', 'prices', 'liquidity', 
            'num_transactions', 'n_unique_addresses', 
            'tx_curve', 'mints', 'burns', 'difference_token_pool']

X = df_dataset[FEATURES]
y = df_dataset['label']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"Train: {X_train.shape} | Test: {X_test.shape}")
print(f"Train label dist:\n{y_train.value_counts()}")

Train: (780, 10) | Test: (195, 10)
Train label dist:
label
0    534
1    246
Name: count, dtype: int64


In [63]:
from sklearn.model_selection import StratifiedKFold, cross_val_score

params = {
    "n_estimators"    : 100,
    "random_state"    : 42,
    "scale_pos_weight": (y_train == 0).sum() / (y_train == 1).sum(),
    "max_depth"       : 6,       # mitad del rango [3,10]
    "subsample"       : 0.8,     # dentro de [0.5, 1]
    "learning_rate"   : 0.1,     # valor cl√°sico dentro de [1e-5, 1]
    "gamma"           : 1e-2,    # conservador dentro de [1e-8, 1e2]
    "reg_lambda"      : 1.0,     # default XGBoost, dentro del rango
    "reg_alpha"       : 1e-2,    # ligera regularizaci√≥n L1
}

with mlflow.start_run(run_name="xgboost-baseline"):
    
    mlflow.log_params(params)
    mlflow.log_param("window_blocks", WINDOW)
    mlflow.log_param("blockstudy",    BLOCKSTUDY)
    mlflow.log_param("train_size",    len(X_train))
    mlflow.log_param("test_size",     len(X_test))
    mlflow.log_param("cv_folds",      5)

    model = xgb.XGBClassifier(**params)

    # K-Fold cross validation sobre train
    cv      = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    cv_aucs = cross_val_score(model, X_train, y_train, cv=cv, scoring='roc_auc')
    
    mlflow.log_metric("cv_auc_mean", cv_aucs.mean())
    mlflow.log_metric("cv_auc_std",  cv_aucs.std())
    print(f"CV AUC: {cv_aucs.mean():.4f} ¬± {cv_aucs.std():.4f}")

    # Entrenar con todo el train y evaluar en test
    model.fit(X_train, y_train)
    y_pred  = model.predict(X_test)
    y_proba = model.predict_proba(X_test)[:, 1]
    auc     = roc_auc_score(y_test, y_proba)

    mlflow.log_metric("test_auc", auc)
    print(f"Test AUC: {auc:.4f}")
    print(classification_report(y_test, y_pred))

    os.makedirs("./mlflow/artifacts", exist_ok=True)
    model.save_model("./mlflow/artifacts/xgboost_rugpull.json")
    print("Modelo guardado en ./mlflow/artifacts/xgboost_rugpull.json")

CV AUC: 0.9353 ¬± 0.0069
Test AUC: 0.9259
              precision    recall  f1-score   support

           0       0.97      0.87      0.92       134
           1       0.76      0.95      0.85        61

    accuracy                           0.89       195
   macro avg       0.87      0.91      0.88       195
weighted avg       0.91      0.89      0.90       195

Modelo guardado en ./mlflow/artifacts/xgboost_rugpull.json
üèÉ View run xgboost-baseline at: http://localhost:5000/#/experiments/977939711845047620/runs/2b615e76f2b34d06bab26d9dbd1a2d75
üß™ View experiment at: http://localhost:5000/#/experiments/977939711845047620


Desde una perspectiva constructiva para tu tesis:
Valida tu pipeline end-to-end
Obtener AUC 0.93 con un baseline simple confirma que todo el proceso funciona correctamente ‚Äî desde la extracci√≥n en BigQuery, pasando por el feature engineering, el labeling y hasta el entrenamiento. Si hubiera errores en alguna etapa, las m√©tricas ser√≠an malas.
Justifica el uso de XGBoost
El resultado respalda la elecci√≥n del algoritmo. Mazorra tambi√©n us√≥ XGBoost y obtuvo resultados similares, lo que refuerza que tu implementaci√≥n es comparable con la literatura.
Establece un baseline s√≥lido para experimentos futuros
Ahora tienes un punto de referencia claro. Cualquier experimento posterior ‚Äî cambiar la ventana de observaci√≥n, ajustar umbrales de labeling, agregar features ‚Äî se puede comparar contra este 0.93. Eso es exactamente lo que MLflow est√° registrando.
Responde directamente RQ4
Tu cuarta pregunta de investigaci√≥n pregunta si el pipeline permite evaluaci√≥n comparable con la literatura. AUC 0.93 vs 0.95 de Mazorra con optimizaci√≥n responde afirmativamente con evidencia emp√≠rica.
Abre la puerta al an√°lisis de sensibilidad
Como tienes el pipeline reproducible, puedes variar par√°metros como WINDOW, INACTIVITY o los umbrales de las heur√≠sticas y ver c√≥mo cambian las m√©tricas ‚Äî que es exactamente lo que pide RQ3 sobre sensibilidad del labeling. Sonnet 4.6