### ETH Whale Activity ML Pipeline

- Setup & Configuration

In [10]:
import os
import time
import pickle
import requests
import warnings
import json
from datetime import datetime, timedelta

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, classification_report, confusion_matrix, make_scorer
)

try:
    from imblearn.over_sampling import SMOTE
    from imblearn.pipeline import Pipeline as ImbPipeline
    HAS_IMBLEARN = True
except ImportError:
    HAS_IMBLEARN = False
    print("‚ö†Ô∏è imbalanced-learn not installed")

try:
    from xgboost import XGBClassifier
    HAS_XGBOOST = True
except ImportError:
    HAS_XGBOOST = False
    print("‚ö†Ô∏è XGBoost not installed")

from dotenv import load_dotenv

warnings.filterwarnings('ignore')

‚ö†Ô∏è imbalanced-learn not installed


- Loading and Configuring Environmental Varriables

In [11]:
# Load API keys
load_dotenv()
DUNE_API_KEY = os.getenv("DUNE_WHALES_API")
COINGECKO_API_KEY = os.getenv("COINGECKO_API_KEY")

if not DUNE_API_KEY or not COINGECKO_API_KEY:
    raise ValueError("‚ùå Missing API keys in .env file")

# Configuration
QUERY_ID = "6184996"
REQUEST_DELAY = 0.5
OUTPUT_FILE = 'whale_prices_ml_ready.csv'
MODEL_FILE = 'models/eth_price_predictor.pkl'

print(" Configuration loaded")

 Configuration loaded


- Data Collection - Fetch Whale Data from Dune

In [13]:

CACHE_FILE = "dune_data_cache.json"


def fetch_dune_data_incremental(query_id, api_key):

    headers = {"x-dune-api-key": api_key}
    today = datetime.utcnow().date()

    # 1. Load cache if it exists
    
    if os.path.exists(CACHE_FILE):
        with open(CACHE_FILE, "r") as f:
            cache = json.load(f)

        df_cached = pd.DataFrame(cache["data"])
        df_cached["block_date"] = pd.to_datetime(df_cached["block_date"]).dt.date

        if "last_block_date" in cache:
            last_block_date = pd.to_datetime(cache["last_block_date"]).date()
        else:
            last_block_date = df_cached["block_date"].max()

        fetch_start_date = last_block_date + timedelta(days=1)

        if fetch_start_date >= today:
            print(f"‚úÖ Cache up-to-date (last block_date: {last_block_date})")
            return df_cached

        print(f"üîÑ Fetching new data: {fetch_start_date} ‚Üí {today}")

    else:
        print("üÜï No cache found. Fetching full history...")
        df_cached = pd.DataFrame()
        fetch_start_date = None

    # 2. Execute Dune query
    # (query must accept {{start_date}})
    
    payload = {}
    if fetch_start_date:
        payload = {
            "query_parameters": [
                {
                    "name": "start_date",
                    "type": "date",
                    "value": fetch_start_date.isoformat()
                }
            ]
        }

    execute = requests.post(
        f"https://api.dune.com/api/v1/query/{query_id}/execute",
        headers=headers,
        json=payload
    ).json()

    execution_id = execute.get("execution_id")
    if not execution_id:
        raise RuntimeError("Failed to execute Dune query")

        # 3. Poll until query completes
    
    while True:
        status = requests.get(
            f"https://api.dune.com/api/v1/execution/{execution_id}/status",
            headers=headers
        ).json()["state"]

        if status == "QUERY_STATE_COMPLETED":
            break
        if status == "QUERY_STATE_FAILED":
            raise RuntimeError("Dune query failed")

        time.sleep(10)
    
    # 4. Fetch results
    
    results = requests.get(
        f"https://api.dune.com/api/v1/execution/{execution_id}/results",
        headers=headers
    ).json()

    df_new = pd.DataFrame(results["result"]["rows"])

    if not df_new.empty:
        df_new["block_date"] = pd.to_datetime(df_new["block_date"]).dt.date
        df_new = df_new[df_new["block_date"] < today]

        # 5. Merge + save cache
    
    df_all = pd.concat([df_cached, df_new], ignore_index=True).drop_duplicates()

    last_block_date = df_all["block_date"].max()

    cache_payload = {
        "last_block_date": last_block_date.isoformat(),
        "data": json.loads(df_all.to_json(orient="records", date_format="iso"))
    }

    with open(CACHE_FILE, "w") as f:
        json.dump(cache_payload, f)

    print(f"‚úÖ Cache updated (last block_date: {last_block_date})")
    return df_all


# Usage
df_whales = fetch_dune_data_incremental(QUERY_ID, DUNE_API_KEY)

‚úÖ Cache up-to-date (last block_date: 2025-12-18)


In [14]:
df_whales


Unnamed: 0,avg_whale_tx_size_weth,block_date,deposit_tx_count,deposit_withdrawal_ratio,exchange_volume_ratio,max_whale_tx_size_weth,mega_whale_ratio,mega_whale_tx_count,mega_whale_volume_weth,net_flow_ma30,...,volume_vs_ma30,volume_vs_ma7,volume_zscore_30d,volume_zscore_7d,whale_exchange_deposits_weth,whale_exchange_withdrawals_weth,whale_net_exchange_flow_weth,whale_tx_count,whale_weth_transfer_velocity,withdrawal_tx_count
0,4314.097637,2025-12-18,0,0.0,0.0,14000.0000,0.9758,2674,1.935198e+07,39.2979,...,0.8776,1.1556,-0.2702,0.6848,0.0,0.0,0.0,4597,1.983191e+07,0
1,5209.070494,2025-12-17,0,0.0,0.0,36677.7456,0.9827,2395,2.081845e+07,37.6312,...,0.9406,1.3308,-0.1304,1.2320,0.0,0.0,0.0,4067,2.118529e+07,0
2,4990.620477,2025-12-16,0,0.0,0.0,16118.6764,0.9819,2444,2.016495e+07,37.6312,...,0.9169,1.4204,-0.1807,1.5560,0.0,0.0,0.0,4115,2.053640e+07,0
3,4189.753176,2025-12-15,0,0.0,0.0,14729.2075,0.9781,2504,1.856384e+07,37.6312,...,0.8379,1.4614,-0.3547,1.9395,0.0,0.0,0.0,4530,1.897958e+07,0
4,5209.769336,2025-12-14,0,0.0,0.0,12575.0000,0.9826,1777,1.411387e+07,37.6312,...,0.6299,1.2264,-0.8173,1.5061,0.0,0.0,0.0,2757,1.436333e+07,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1090,3341.463214,2022-12-24,0,0.0,0.0,14974.9156,0.9667,332,3.133297e+06,0.0000,...,0.8939,0.8939,-1.1900,-1.1900,0.0,0.0,0.0,970,3.241219e+06,0
1091,1996.218376,2022-12-23,0,0.0,0.0,42000.0000,0.9366,380,3.217648e+06,0.0000,...,0.9230,0.9230,-1.0284,-1.0284,0.0,0.0,0.0,1721,3.435492e+06,0
1092,1728.538351,2022-12-22,0,0.0,0.0,25838.3845,0.9303,429,3.356111e+06,0.0000,...,0.9449,0.9449,-0.8459,-0.8459,0.0,0.0,0.0,2087,3.607460e+06,0
1093,2268.029759,2022-12-21,0,0.0,0.0,23353.4997,0.9521,372,3.573848e+06,0.0000,...,0.9569,0.9569,-0.7071,-0.7071,0.0,0.0,0.0,1655,3.753589e+06,0


- Function for Price Data - Fetch ETH & BTC Prices

In [15]:

def fetch_coingecko_price(coin_id, from_date, to_date, api_key):
    """Fetch daily prices from CoinGecko Pro API"""
    print(f"\nüìà Fetching {coin_id.upper()} prices: {from_date} ‚Üí {to_date}")
    
    from_ts = int(pd.Timestamp(from_date).timestamp())
    to_ts = int(pd.Timestamp(to_date).timestamp())
    
    url = f"https://pro-api.coingecko.com/api/v3/coins/{coin_id}/market_chart/range"
    headers = {'accept': 'application/json', 'x-cg-pro-api-key': api_key}
    params = {'vs_currency': 'usd', 'from': from_ts, 'to': to_ts}
    
    try:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        data = response.json()
        
        prices = data['prices']
        df = pd.DataFrame({'timestamp': [p[0] for p in prices], 'price': [p[1] for p in prices]})
        df['date'] = pd.to_datetime(df['timestamp'], unit='ms').dt.date
        df = df.groupby('date', as_index=False).agg({'price': 'last'})
        
        print(f"   ‚úÖ {len(df)} days | ${df['price'].min():.0f} - ${df['price'].max():.0f}")
        return df
    except Exception as e:
        print(f"   ‚ùå Error: {e}")
        return pd.DataFrame()

-  Fetching BTC and ETH Prices

In [16]:

PRICE_CACHE_DIR = "data/price_cache"
os.makedirs(PRICE_CACHE_DIR, exist_ok=True)

def load_cached_price(symbol):
    path = f"{PRICE_CACHE_DIR}/{symbol}.csv"
    if os.path.exists(path):
        df = pd.read_csv(path, parse_dates=["date"])
        return df
    return None

def save_price_cache(symbol, df):
    path = f"{PRICE_CACHE_DIR}/{symbol}.csv"
    df.sort_values("date").drop_duplicates("date").to_csv(path, index=False)

def get_price_data_incremental(symbol, cg_id, start_date, end_date):
    cached = load_cached_price(symbol)

    if cached is None:
        print(f" No cache found for {symbol}. Fetching full range...")
        fetch_start = start_date
        base_df = pd.DataFrame()
    else:
        last_cached_date = cached["date"].max()
        fetch_start = last_cached_date + timedelta(days=1)
        base_df = cached

        if fetch_start > end_date:
            print(f" {symbol} price cache already up-to-date.")
            return base_df

        print(f" Updating {symbol}: {fetch_start.date()} ‚Üí {end_date.date()}")

    new_data = fetch_coingecko_price(
        cg_id,
        fetch_start.strftime("%Y-%m-%d"),
        end_date.strftime("%Y-%m-%d"),
        COINGECKO_API_KEY
    )

    df_new = new_data.rename(columns={"price": f"{symbol}_price"})
    df_all = pd.concat([base_df, df_new], ignore_index=True)

    save_price_cache(symbol, df_all)

    time.sleep(REQUEST_DELAY)

    return df_all


In [17]:
# Determine date range (+ buffer for indicators)
min_date = pd.to_datetime(df_whales['block_date'].min()) - timedelta(days=100)
max_date = pd.to_datetime(df_whales['block_date'].max())

print(f" Price fetch range: {min_date.date()} ‚Üí {max_date.date()}")

df_eth = get_price_data_incremental(
    symbol="eth",
    cg_id="ethereum",
    start_date=min_date,
    end_date=max_date
)

df_btc = get_price_data_incremental(
    symbol="btc",
    cg_id="bitcoin",
    start_date=min_date,
    end_date=max_date
)

print("\n Price data ready (cached + updated)")


 Price fetch range: 2022-09-11 ‚Üí 2025-12-18
 eth price cache already up-to-date.
 btc price cache already up-to-date.

 Price data ready (cached + updated)


- Data Merging: Dune's Query and Prices from Coingecko

In [19]:
df_whales['block_date'] = pd.to_datetime(df_whales['block_date'])
df_eth['date'] = pd.to_datetime(df_eth['date'])
df_btc['date'] = pd.to_datetime(df_btc['date'])

# Merge ETH prices
df_merged = pd.merge(df_whales, df_eth, left_on='block_date', right_on='date', how='inner')
df_merged = df_merged.drop('date', axis=1)

# Merge BTC prices
df_merged = pd.merge(df_merged, df_btc, left_on='block_date', right_on='date', how='inner')
df_merged = df_merged.drop('date', axis=1)

print(f" Merged: {len(df_merged)} rows, {len(df_merged.columns)} columns")


 Merged: 1095 rows, 31 columns


- Feature Engineering from version 1

In [None]:
def add_price_features(df, price_col, prefix):
    """Add price-based ML features"""
    df = df.sort_values('block_date').reset_index(drop=True)
    
    # Returns
    df[f'{prefix}_daily_return'] = df[price_col].pct_change()
    df[f'{prefix}_log_return'] = np.log(df[price_col] / df[price_col].shift(1))
    
    # Moving averages
    df[f'{prefix}_ma7'] = df[price_col].rolling(7, min_periods=1).mean()
    df[f'{prefix}_ma30'] = df[price_col].rolling(30, min_periods=1).mean()
    
    # Momentum
    df[f'{prefix}_vs_ma7'] = df[price_col] / df[f'{prefix}_ma7']
    df[f'{prefix}_vs_ma30'] = df[price_col] / df[f'{prefix}_ma30']
    
    # Volatility
    df[f'{prefix}_vol7'] = df[f'{prefix}_daily_return'].rolling(7, min_periods=1).std()
    df[f'{prefix}_vol30'] = df[f'{prefix}_daily_return'].rolling(30, min_periods=1).std()
    
    # Returns
    df[f'{prefix}_ret7d'] = df[price_col].pct_change(7)
    df[f'{prefix}_ret30d'] = df[price_col].pct_change(30)
    
    # RSI
    returns = df[f'{prefix}_daily_return']
    gains = returns.where(returns > 0, 0).rolling(14, min_periods=1).mean()
    losses = -returns.where(returns < 0, 0).rolling(14, min_periods=1).mean()
    rs = gains / (losses + 1e-10)
    df[f'{prefix}_rsi'] = 100 - (100 / (1 + rs))
    
    # Lags
    for lag in [1, 3, 7]:
        df[f'{prefix}_ret_lag{lag}'] = df[f'{prefix}_daily_return'].shift(lag)
    
    return df


def add_correlation_features(df):
    """Add ETH-BTC correlation features"""
    df['eth_btc_ratio'] = df['eth_price'] / df['btc_price']
    df['eth_btc_ratio_ma7'] = df['eth_btc_ratio'].rolling(7, min_periods=1).mean()
    df['eth_btc_corr_30d'] = df['eth_daily_return'].rolling(30, min_periods=20).corr(df['btc_daily_return'])
    df['eth_outperformance'] = df['eth_daily_return'] - df['btc_daily_return']
    return df

def create_target(df):
    """Create target: next day price direction"""
    df['next_day_return'] = df['eth_price'].pct_change().shift(-1)
    df['next_day_price_direction'] = (df['next_day_return'] > 0).astype(int)
    return df

# %%
print("\n‚öôÔ∏è Engineering features...")

df_merged = add_price_features(df_merged, 'eth_price', 'eth')
df_merged = add_price_features(df_merged, 'btc_price', 'btc')
df_merged = add_correlation_features(df_merged)
df_merged = create_target(df_merged)

print(f"‚úÖ Features created: {len(df_merged.columns)} total columns")

# Drop rows with NaN target
df_final = df_merged.dropna(subset=['next_day_price_direction']).copy()

print(f"‚úÖ Final dataset: {len(df_final)} rows")
print(f"   Dropped {len(df_merged) - len(df_final)} rows (NaN target)")

# Save
df_final.to_csv(OUTPUT_FILE, index=False)
print(f"üíæ Saved: {OUTPUT_FILE}")



‚öôÔ∏è Engineering features...
‚úÖ Features created: 65 total columns
‚úÖ Final dataset: 1095 rows
   Dropped 0 rows (NaN target)
üíæ Saved: whale_prices_ml_ready.csv
