In [6]:
import os
from datetime import datetime, timezone

import kagglehub
import polars as pl
from dotenv import load_dotenv
from pathlib import Path

# Download Parquet files

In [7]:
def sync_and_save_parquet(dataset_slug: str, target_dirname: str = "data"):
    # 1. Load credentials from .env
    load_dotenv()
    
    if not os.getenv("KAGGLE_USERNAME") or not os.getenv("KAGGLE_KEY"):
        raise EnvironmentError("KAGGLE_USERNAME or KAGGLE_KEY not found in .env file.")

    # 2. Define data path relative to where the code is running
    # This works in Notebooks and .py scripts
    project_root = Path.cwd()
    local_data_dir = project_root / target_dirname
    local_data_dir.mkdir(exist_ok=True)

    # 3. Download the dataset (returns path to kagglehub cache)
    print(f"Fetching latest data from Kaggle: {dataset_slug}...")
    cache_path = Path(kagglehub.dataset_download(dataset_slug))

    # 4. Find all parquet files
    parquet_files = list(cache_path.glob("*.parquet"))
    
    if not parquet_files:
        print("No parquet files found in the dataset.")
        return

    # 5. Save to the local data folder
    print(f"Saving {len(parquet_files)} files to {local_data_dir}...")
    for file in parquet_files:
        # Load and write using Polars
        lf = pl.scan_parquet(file)
        destination = local_data_dir / file.name
        
        # We use collect() here to pull the data from cache and write it locally
        lf.collect().write_parquet(destination)
        print(f" -> Saved {file.name}")

    print(f"\nSync Complete. Files are located in: {local_data_dir}")

# Execution
DATASET_SLUG = "braydenmcarthur/10x-crypto-ohlcv-2024-2025"

try:
    sync_and_save_parquet(DATASET_SLUG)
except Exception as e:
    print(f"Error: {e}")

Fetching latest data from Kaggle: braydenmcarthur/10x-crypto-ohlcv-2024-2025...
Saving 10 files to /home/zoltesh/projects/intelligent_diversification/src/data...
 -> Saved XRP-USDC.parquet
 -> Saved SOL-USDC.parquet
 -> Saved BCH-USDC.parquet
 -> Saved ADA-USDC.parquet
 -> Saved AVAX-USDC.parquet
 -> Saved ETH-USDC.parquet
 -> Saved BTC-USDC.parquet
 -> Saved LTC-USDC.parquet
 -> Saved DOGE-USDC.parquet
 -> Saved LINK-USDC.parquet

Sync Complete. Files are located in: /home/zoltesh/projects/intelligent_diversification/src/data


# Check Total Length

In [8]:
num_records = 0
for file in os.listdir('data/'):
    num_records += pl.scan_parquet(f'data/{file}').collect().shape[0]
print(f"Total number of records: {num_records}")


Total number of records: 2104131


In [9]:
# --- Path Resolution ---
DATA_DIR = Path("data")
if not DATA_DIR.is_dir():
    raise FileNotFoundError("Could not locate data directory.")

OUT_DIR = Path("data_cleaned")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# --- Constants & Config ---
START_MS = 1_704_067_200_000  # 2024-01-01 00:00:00 UTC
END_MS = 1_767_225_300_000    # 2025-12-31 23:55:00 UTC
INTERVAL = "5m"
EXPECTED_ROWS = 210_528
PRICE_COLS = ["open", "high", "low", "close"]
VOLUME_COL = ["volume"]
VALUE_COLS = PRICE_COLS + VOLUME_COL

# Create the master timestamp skeleton
START_DT = datetime.fromtimestamp(START_MS / 1_000, tz=timezone.utc).replace(tzinfo=None)
END_DT = datetime.fromtimestamp(END_MS / 1_000, tz=timezone.utc).replace(tzinfo=None)

full_index = pl.datetime_range(
    START_DT,
    END_DT,
    interval=INTERVAL,
    time_unit="ms",
    eager=True,
).alias("timestamp")

assert len(full_index) == EXPECTED_ROWS, f"Index mismatch: {len(full_index)} vs {EXPECTED_ROWS}"

# --- Processing Loop ---
parquet_files = sorted(DATA_DIR.glob("*.parquet"))

for path in parquet_files:
    print(f"Processing {path.name}...")
    
    # 1. Load and Clean Raw Data
    df_raw = (
        pl.read_parquet(path)
        .with_columns([
            pl.col("timestamp").cast(pl.Datetime("ms")),
            *[pl.col(c).cast(pl.Float64) for c in VALUE_COLS]
        ])
        # Remove duplicates if any exist in the raw source
        .unique(subset="timestamp", keep="first")
    )

    # 2. Align to Master Index and Impute
    # We use a 'left' join so every output file has the exact same 'full_index'
    df_imputed = (
        pl.DataFrame(full_index)
        .join(df_raw, on="timestamp", how="left")
        .with_columns([
            # FORWARD FILL prices (use the last known price)
            # BACKWARD FILL handles the case where the very first row of the file is missing
            pl.col(PRICE_COLS).fill_null(strategy="forward").fill_null(strategy="backward"),
            
            # ZERO FILL volume (no data usually means no trades)
            pl.col(VOLUME_COL).fill_null(0.0)
        ])
    )

    # 3. Final Format & Integrity Check
    output = (
        df_imputed
        .with_columns(pl.col("timestamp").dt.timestamp("ms").cast(pl.Int64()))
        .select(["timestamp", *VALUE_COLS])
    )

    if output.height != EXPECTED_ROWS:
        raise ValueError(f"Height mismatch for {path.name}: {output.height}")

    # 4. Save
    out_path = OUT_DIR / path.name
    output.write_parquet(out_path)
    print(f"Successfully saved to {out_path}")

print("\nAll files processed successfully.")

Processing ADA-USDC.parquet...
Successfully saved to data_cleaned/ADA-USDC.parquet
Processing AVAX-USDC.parquet...
Successfully saved to data_cleaned/AVAX-USDC.parquet
Processing BCH-USDC.parquet...
Successfully saved to data_cleaned/BCH-USDC.parquet
Processing BTC-USDC.parquet...
Successfully saved to data_cleaned/BTC-USDC.parquet
Processing DOGE-USDC.parquet...
Successfully saved to data_cleaned/DOGE-USDC.parquet
Processing ETH-USDC.parquet...
Successfully saved to data_cleaned/ETH-USDC.parquet
Processing LINK-USDC.parquet...
Successfully saved to data_cleaned/LINK-USDC.parquet
Processing LTC-USDC.parquet...
Successfully saved to data_cleaned/LTC-USDC.parquet
Processing SOL-USDC.parquet...
Successfully saved to data_cleaned/SOL-USDC.parquet
Processing XRP-USDC.parquet...
Successfully saved to data_cleaned/XRP-USDC.parquet

All files processed successfully.


In [None]:
# --- Configuration ---
CLEANED_DIR = Path("data_cleaned") 
EXPECTED_ROWS = 210_528

# Verify directory exists and has files
if not CLEANED_DIR.exists():
    print(f"Error: Directory '{CLEANED_DIR}' does not exist.")
    exit()

parquet_files = list(CLEANED_DIR.glob("*.parquet"))
if not parquet_files:
    print(f"Error: No parquet files found in '{CLEANED_DIR.absolute()}'.")
    exit()

# --- Validation Table ---
print(f"{'File':<20} | {'Imputed %':<10} | {'Nulls':<6} | {'Status'}")
print("-" * 55)

for path in sorted(parquet_files):
    # Use scan_parquet for faster metadata-only access where possible
    df = pl.read_parquet(path)
    
    # Check 1: Missing Data (using volume as proxy)
    imputed_count = df.filter(pl.col("volume") == 0).height
    imputed_pct = (imputed_count / EXPECTED_ROWS) * 100
    
    # Check 2: Total Nulls
    null_count = df.null_count().sum_horizontal().item()
    
    # Check 3: Timestamp Gaps (Should be exactly 300,000ms)
    # diff() results in a null for the first row, so we drop it
    has_gaps = not (df["timestamp"].diff().drop_nulls() == 300_000).all()
    
    # Check 4: Correct Row Count
    wrong_count = df.height != EXPECTED_ROWS
    
    status = "OK" if (null_count == 0 and not has_gaps and not wrong_count) else "ERROR"
    
    print(f"{path.name:<20} | {imputed_pct:>9.2f}% | {null_count:>6} | {status}")

    if status == "ERROR":
        if null_count > 0: print(f"  --> [FAIL] {null_count} null values found.")
        if has_gaps:   print(f"  --> [FAIL] Timestamp continuity broken.")
        if wrong_count: print(f"  --> [FAIL] Expected {EXPECTED_ROWS} rows, got {df.height}.")

File                 | Imputed %  | Nulls  | Status
-------------------------------------------------------
ADA-USDC.parquet     |      0.10% |      0 | OK
AVAX-USDC.parquet    |      0.12% |      0 | OK
BCH-USDC.parquet     |      0.10% |      0 | OK
BTC-USDC.parquet     |      0.08% |      0 | OK
DOGE-USDC.parquet    |      0.09% |      0 | OK
ETH-USDC.parquet     |      0.09% |      0 | OK
LINK-USDC.parquet    |      0.09% |      0 | OK
LTC-USDC.parquet     |      0.09% |      0 | OK
SOL-USDC.parquet     |      0.09% |      0 | OK
XRP-USDC.parquet     |      0.09% |      0 | OK


# Technical Indicators

In [15]:
tf_map = {
    '5m': 5,
    '15m': 15,
    '30m': 30,
    '1h': 60
}

## RSI

In [None]:
def add_rsi(df: pl.DataFrame, tf: str, period: int) -> pl.DataFrame:
    # 1. Validation (Assuming tf_map is available or passed in)
    tf_map = {'5m': 5, '15m': 15, '1h': 60} # Example local definition
    minutes = tf_map.get(tf)
    
    if minutes is None:
        raise ValueError(f"Unsupported timeframe '{tf}'")
    if period <= 0:
        raise ValueError("period must be a positive integer")

    # 2. Calculate Changes
    change = pl.col("close").diff()
    
    # 3. Define Gains and Losses
    # We use abs() for losses to keep them positive for the math
    gains = pl.when(change > 0).then(change).otherwise(0.0)
    losses = pl.when(change < 0).then(change.abs()).otherwise(0.0)

    # 4. Calculate Wilder's Smoothing (alpha = 1/N)
    alpha = 1 / float(period)
    avg_gain = gains.ewm_mean(alpha=alpha, adjust=False, min_samples=period)
    avg_loss = losses.ewm_mean(alpha=alpha, adjust=False, min_samples=period)

    # 5. Calculate RSI with Zero-Division Protection
    # If avg_loss is 0, RS is infinite -> RSI is 100.
    rs = avg_gain / avg_loss
    
    rsi_expr = pl.when(avg_loss == 0)\
        .then(100.0)\
        .otherwise(100.0 - (100.0 / (1.0 + rs)))

    col_name = f"rsi_{tf}_{period}"
    return df.with_columns(rsi_expr.alias(col_name))

In [29]:
new_df = add_rsi(df=df, tf='5m', period=14)

In [30]:
new_df.tail()

timestamp,open,high,low,close,volume,rsi_5m_14
i64,f64,f64,f64,f64,f64,f64
1767224100000,1.8413,1.8414,1.8396,1.8403,296177.813697,57.750103
1767224400000,1.8403,1.8407,1.8387,1.8397,331597.905883,56.018254
1767224700000,1.8397,1.8398,1.838,1.838,317690.771665,51.322082
1767225000000,1.838,1.8398,1.8376,1.8398,311577.070954,55.569311
1767225300000,1.8398,1.8403,1.8389,1.839,171277.063977,53.341684
