# Crypto Index Construction Engine (Scalable Version)

A fully distributed **PySpark** analytics engine that constructs a monthly-rebalanced
*Smart Index* of cryptocurrencies.

## Features
- Scales to 2 000+ trading pairs
- Fully distributed Spark pipeline — **no Pandas anywhere**
- Robust error handling for Windows / local execution

## Pipeline Stages
| # | Stage | Technique |
|---|-------|-----------|
| 1 | Ingestion & Temporal Alignment | Spark SQL |
| 2 | Feature Engineering / Metrics | Spark Window Functions |
| 3 | Correlation Pruning | Spark MLlib (Pearson) |
| 3B | Dimensionality Reduction | PCA (Spark MLlib) |
| 4 | Asset Clustering | K-Means (Spark MLlib) |
| 4C | Return Prediction | Gradient Boosted Trees (Spark MLlib) |
| 5 | Portfolio Construction | Inverse-Volatility Weighting |

## Environment Setup

In [None]:
import findspark
try:
    findspark.init()
except Exception:
    pass

In [None]:
import os, sys, glob, csv
import numpy as np
import matplotlib
matplotlib.use("Agg")          # non-interactive backend (safe for servers)
import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import (
    StructType, StructField, DoubleType, LongType, StringType
)
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.stat import Correlation
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator, RegressionEvaluator
from pyspark.ml.regression import GBTRegressor

In [None]:
# ── Data source ──────────────────────────────────────────────────
DATA_SOURCE   = "crypto_data_parquet"   # Partitioned Parquet
CSV_FALLBACK  = "crypto_data_4h"        # CSV folder (fallback)
OUTPUT_FOLDER = "output"

# ── Index parameters ────────────────────────────────────────────
CORRELATION_THRESHOLD = 0.85
TOP_N_ASSETS          = 20
CANDLES_PER_DAY       = 6               # 4-hour candles
ANNUALIZATION_FACTOR  = np.sqrt(365 * CANDLES_PER_DAY)

## Create Spark Session

In [None]:
spark = (SparkSession.builder
         .appName("CryptoIndexEngine")
         .config("spark.driver.memory", "4g")
         .config("spark.sql.adaptive.enabled", "true")
         .config("spark.sql.shuffle.partitions", "20")
         .getOrCreate())
spark.sparkContext.setLogLevel("ERROR")
print("Spark version:", spark.version)

## Stage 1 — Data Ingestion & Temporal Alignment

Read raw OHLCV candle data (Parquet preferred, CSV fallback) into a long
Spark DataFrame with columns `(timestamp, symbol, close)`.

In [None]:
# ── 1A  Read raw data ────────────────────────────────────────────
if os.path.exists(DATA_SOURCE) and os.listdir(DATA_SOURCE):
    print(f"Reading Parquet from: {DATA_SOURCE}")
    raw_df = spark.read.parquet(DATA_SOURCE)

elif os.path.exists(CSV_FALLBACK):
    print(f"Reading CSV from: {CSV_FALLBACK}")
    csv_files = glob.glob(os.path.join(CSV_FALLBACK, "*.csv"))
    if not csv_files:
        raise FileNotFoundError(f"No CSVs in {CSV_FALLBACK}")

    schema = StructType([
        StructField("datetime",        StringType(),  True),
        StructField("open",            DoubleType(),  True),
        StructField("high",            DoubleType(),  True),
        StructField("low",             DoubleType(),  True),
        StructField("close",           DoubleType(),  True),
        StructField("volume",          DoubleType(),  True),
        StructField("quote_volume",    DoubleType(),  True),
        StructField("trades",          LongType(),    True),
        StructField("taker_buy_base",  DoubleType(),  True),
        StructField("taker_buy_quote", DoubleType(),  True),
        StructField("ignore",          DoubleType(),  True),
    ])

    raw_df = spark.read.csv(csv_files, schema=schema, header=True)
    raw_df = raw_df.withColumn("_filename", F.input_file_name())
    raw_df = raw_df.withColumn(
        "symbol",
        F.regexp_extract(F.col("_filename"), r"([A-Z0-9]+)-\d+h-", 1),
    )
    raw_df = raw_df.withColumn(
        "timestamp",
        F.to_timestamp(F.col("datetime"), "yyyy-MM-dd HH:mm:ss"),
    )
else:
    raise FileNotFoundError("No data source found!")

# ── 1B  Select & cache ─────────────────────────────────────────
long_df = (
    raw_df
    .select(F.col("timestamp"), F.col("symbol"), F.col("close").cast("double"))
    .filter(F.col("close").isNotNull())
    .cache()
)

row_count = long_df.count()
symbols   = sorted([r.symbol for r in long_df.select("symbol").distinct().collect()])
print(f"Loaded {row_count:,} rows across {len(symbols)} assets.")

In [None]:
# ── 1C  Pivot to wide (timestamp × symbol) matrix ───────────────
master_matrix = (
    long_df
    .groupBy("timestamp")
    .pivot("symbol", symbols)
    .agg(F.first("close"))
    .orderBy("timestamp")
    .cache()
)
print(f"Master Matrix: {master_matrix.count()} rows × {len(symbols)} assets")

## Stage 2 — Feature Engineering & Metrics

For every asset we compute:
- **Log returns** (4-hour)
- **Annualised return** (scaled to 365 × 6 periods / year)
- **Annualised volatility**
- **Sharpe ratio** (assuming risk-free rate = 0)

In [None]:
ANN_PERIODS = 365 * CANDLES_PER_DAY          # periods per year

win = Window.partitionBy("symbol").orderBy("timestamp")

returns_df = (
    long_df
    .withColumn("prev_close", F.lag("close").over(win))
    .withColumn("log_ret", F.log(F.col("close") / F.col("prev_close")))
    .filter(F.col("log_ret").isNotNull())
)

metrics = (
    returns_df
    .groupBy("symbol")
    .agg(
        F.count("log_ret").alias("count"),
        F.sum("log_ret").alias("sum_log_ret"),
        F.stddev("log_ret").alias("std_log_ret"),
    )
    .filter(F.col("count") > 50)                    # minimum data points
    .withColumn("annual_ret",
                F.col("sum_log_ret") * (ANN_PERIODS / F.col("count")))
    .withColumn("annual_vol",
                F.col("std_log_ret") * np.sqrt(ANN_PERIODS))
    .withColumn("sharpe",
                F.col("annual_ret") / F.col("annual_vol"))
    .cache()
)

print("Per-asset metrics computed.  Top 5 by Sharpe:")
metrics.orderBy(F.desc("sharpe")).show(5)

## Stage 3 — Correlation Analysis  *(Spark MLlib — Pearson)*

We compute the Pearson correlation matrix on **log returns** (not raw prices)
so that two assets that trend together in *percentage moves* are correctly
identified as correlated.

Missing returns are imputed with **0** (equivalent to forward-filling the
price: $\log(P_t / P_{t-1}) = 0 \Rightarrow P_t = P_{t-1}$).

In [None]:
# ── Build returns matrix (timestamp × symbol) ───────────────────
ret_long = (
    long_df
    .withColumn("prev", F.lag("close").over(win))
    .withColumn("ret", F.log(F.col("close") / F.col("prev")))
    .filter(F.col("ret").isNotNull())
    .select("timestamp", "symbol", "ret")
)

wide_ret = (
    ret_long
    .groupBy("timestamp")
    .pivot("symbol", symbols)
    .agg(F.first("ret"))
    .fillna(0)                              # forward-fill imputation
)

valid_cols = [c for c in symbols if c in wide_ret.columns]

assembler = VectorAssembler(inputCols=valid_cols, outputCol="features")
vec_df    = assembler.transform(wide_ret).select("features")

print("Computing Pearson correlation matrix …")
corr_mat = Correlation.corr(vec_df, "features", "pearson").head()[0].toArray()
print(f"Correlation matrix shape: {corr_mat.shape}")

## Stage 3B — PCA  *(Spark MLlib — Dimensionality Reduction)*

Principal Component Analysis decomposes the returns into independent
market factors and tells us how many components explain most of the
variance.

In [None]:
N_COMPONENTS = 10

pca = PCA(
    k=min(N_COMPONENTS, len(valid_cols)),
    inputCol="features",
    outputCol="pca_features",
)
pca_model    = pca.fit(vec_df)
explained_var = pca_model.explainedVariance.toArray()
cumsum_var    = np.cumsum(explained_var)

print("Explained variance by component:")
for i in range(min(5, len(explained_var))):
    print(f"  PC{i+1}: {explained_var[i]*100:6.2f}%  "
          f"(cumulative: {cumsum_var[i]*100:6.2f}%)")
print(f"\nTotal explained by {len(explained_var)} components: "
      f"{cumsum_var[-1]*100:.2f}%")

## Stage 4 — K-Means Clustering  *(Spark MLlib)*

Cluster cryptocurrencies by their risk / return profiles (`annual_ret`,
`annual_vol`, `sharpe`).  Features are **standardised** before clustering.

From each cluster the highest-Sharpe asset is selected to form a
*diversified* portfolio.

In [None]:
N_CLUSTERS = 5

# ── Assemble & scale ────────────────────────────────────────────
km_assembler = VectorAssembler(
    inputCols=["annual_ret", "annual_vol", "sharpe"],
    outputCol="features_raw",
)
km_df = km_assembler.transform(metrics)

scaler = StandardScaler(
    inputCol="features_raw", outputCol="km_features",
    withStd=True, withMean=True,
)
km_df = scaler.fit(km_df).transform(km_df)

# ── Fit K-Means ─────────────────────────────────────────────────
kmeans       = KMeans(k=N_CLUSTERS, seed=42, featuresCol="km_features",
                      predictionCol="cluster")
kmeans_model = kmeans.fit(km_df)
clustered    = kmeans_model.transform(km_df).cache()

# ── Evaluate ─────────────────────────────────────────────────────
evaluator  = ClusteringEvaluator(featuresCol="km_features",
                                  predictionCol="cluster",
                                  metricName="silhouette")
silhouette = evaluator.evaluate(clustered)
print(f"Silhouette score: {silhouette:.3f}  (range −1 … 1, higher = better)")

# ── Cluster statistics ───────────────────────────────────────────
print("\nCluster statistics:")
(clustered
 .groupBy("cluster")
 .agg(
     F.count("symbol").alias("n"),
     F.round(F.avg("sharpe"), 4).alias("avg_sharpe"),
     F.round(F.avg("annual_vol"), 4).alias("avg_vol"),
     F.round(F.avg("annual_ret"), 4).alias("avg_ret"),
 )
 .orderBy("cluster")
 .show())

# ── Select cluster leaders ───────────────────────────────────────
rank_win = Window.partitionBy("cluster").orderBy(F.desc("sharpe"))
cluster_leaders = (
    clustered
    .withColumn("_rank", F.row_number().over(rank_win))
    .filter(F.col("_rank") == 1)
    .select("symbol", "cluster", "sharpe", "annual_ret", "annual_vol")
)
print(f"Selected {cluster_leaders.count()} cluster representatives:")
cluster_leaders.show()

## Stage 4C — Gradient Boosted Trees  *(Spark MLlib — Regression)*

Train a GBT regressor to predict the **next-period log return** from
technical features (lagged returns, moving-average ratio, rolling
volatility).

The model is evaluated on a held-out 20 % test split.

In [None]:
# ── Feature engineering ───────────────────────────────────────────
gbt_win = Window.partitionBy("symbol").orderBy("timestamp")

gbt_df = (
    long_df
    .withColumn("prev_close", F.lag("close").over(gbt_win))
    .withColumn("log_ret", F.log(F.col("close") / F.col("prev_close")))
    .withColumn("ret_lag1", F.lag("log_ret", 1).over(gbt_win))
    .withColumn("ret_lag2", F.lag("log_ret", 2).over(gbt_win))
    .withColumn("ret_lag3", F.lag("log_ret", 3).over(gbt_win))
    .withColumn("ma7",  F.avg("close").over(gbt_win.rowsBetween(-6, 0)))
    .withColumn("ma30", F.avg("close").over(gbt_win.rowsBetween(-29, 0)))
    .withColumn("ma_ratio", F.col("ma7") / F.col("ma30"))
    .withColumn("volatility_7d",
                F.stddev("log_ret").over(gbt_win.rowsBetween(-6, 0)))
    .withColumn("target", F.lead("log_ret", 1).over(gbt_win))
    .dropna()
)

FEATURE_COLS = ["ret_lag1", "ret_lag2", "ret_lag3", "ma_ratio", "volatility_7d"]
print(f"Engineered {len(FEATURE_COLS)} features: {FEATURE_COLS}")

gbt_assembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="gbt_features")
gbt_df = gbt_assembler.transform(gbt_df).select("symbol", "gbt_features", "target")

train, test = gbt_df.randomSplit([0.8, 0.2], seed=42)
print(f"Train: {train.count():,} rows  |  Test: {test.count():,} rows")

In [None]:
# ── Train GBT ────────────────────────────────────────────────────
gbt = GBTRegressor(featuresCol="gbt_features", labelCol="target",
                   maxIter=20, maxDepth=5, seed=42)
gbt_model = gbt.fit(train)

predictions = gbt_model.transform(test)
rmse = RegressionEvaluator(
    labelCol="target", predictionCol="prediction", metricName="rmse"
).evaluate(predictions)
print(f"GBT test RMSE: {rmse:.6f}")

# ── Feature importance (pure Python — no Pandas) ────────────────
importance = gbt_model.featureImportances.toArray()
feat_imp   = sorted(zip(FEATURE_COLS, importance.tolist()),
                    key=lambda x: -x[1])

print("\nFeature importance:")
for feat, imp in feat_imp:
    print(f"  {feat:20s}  {imp:.4f}")

## Stage 5 — Portfolio Construction

### Correlation-based tournament filter
1. For every pair whose |ρ| > threshold, **drop** the asset with the
   lower Sharpe ratio.
2. From the survivors, keep the top-N by Sharpe.
3. Assign **inverse-volatility weights** (lower-vol → higher weight).

In [None]:
# ── Collect metrics to Python dicts (small data — at most ~2 000 rows) ──
_rows     = metrics.select("symbol", "sharpe", "annual_vol").collect()
sharpe_map = {r["symbol"]: float(r["sharpe"])     for r in _rows}
vol_map    = {r["symbol"]: float(r["annual_vol"]) for r in _rows}

# ── Correlation tournament ──────────────────────────────────────
idx_to_sym = {i: s for i, s in enumerate(valid_cols)}
keep    = set(valid_cols)
dropped = set()

n_rows, n_cols = corr_mat.shape
for i in range(n_rows):
    for j in range(i + 1, n_cols):
        if abs(corr_mat[i, j]) > CORRELATION_THRESHOLD:
            s1, s2 = idx_to_sym[i], idx_to_sym[j]
            if s1 in dropped or s2 in dropped:
                continue
            sh1 = sharpe_map.get(s1)
            sh2 = sharpe_map.get(s2)
            if sh1 is None or sh2 is None:
                continue
            loser = s1 if sh1 < sh2 else s2
            dropped.add(loser)
            keep.discard(loser)

print(f"Correlation filter dropped {len(dropped)} assets, "
      f"{len(keep)} remain.")

# ── Build final portfolio in Spark ───────────────────────────────
keep_list = list(keep)
portfolio = (
    metrics
    .filter(F.col("symbol").isin(keep_list))
    .orderBy(F.desc("sharpe"))
    .limit(TOP_N_ASSETS)
    .withColumn("inv_vol", F.lit(1.0) / F.col("annual_vol"))
)

_total_inv_vol = portfolio.agg(F.sum("inv_vol")).collect()[0][0]

portfolio = portfolio.withColumn(
    "weight", F.col("inv_vol") / F.lit(_total_inv_vol)
).cache()

print(f"\nFinal portfolio — top {TOP_N_ASSETS} assets:")
portfolio.select("symbol", "weight", "sharpe", "annual_vol").show(TOP_N_ASSETS, truncate=False)

## Visualizations

In [None]:
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

plt.figure(figsize=(10, 10))
sns.heatmap(corr_mat, cmap="coolwarm", center=0)
plt.title("Correlation Matrix (Log Returns)")
plt.tight_layout()
plt.savefig(f"{OUTPUT_FOLDER}/correlation_matrix.png", dpi=150)
plt.show()

In [None]:
_km_rows = clustered.select("annual_vol", "sharpe", "cluster").collect()
_vols     = [float(r["annual_vol"]) for r in _km_rows]
_sharpes  = [float(r["sharpe"])     for r in _km_rows]
_clusters = [int(r["cluster"])      for r in _km_rows]

plt.figure(figsize=(10, 8))
sc = plt.scatter(_vols, _sharpes, c=_clusters, cmap="viridis", s=100, alpha=0.6)
plt.xlabel("Annualised Volatility")
plt.ylabel("Sharpe Ratio")
plt.title("K-Means Clustering — Risk vs Return Profile")
plt.colorbar(sc, label="Cluster")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f"{OUTPUT_FOLDER}/kmeans_clusters_viz.png", dpi=150)
plt.show()

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

axes[0].bar(range(1, len(explained_var) + 1), explained_var)
axes[0].set_xlabel("Principal Component")
axes[0].set_ylabel("Explained Variance")
axes[0].set_title("PCA — Variance per Component")
axes[0].grid(True, alpha=0.3)

axes[1].plot(range(1, len(cumsum_var) + 1), cumsum_var * 100, marker="o")
axes[1].axhline(y=90, color="r", linestyle="--", label="90 % threshold")
axes[1].set_xlabel("Number of Components")
axes[1].set_ylabel("Cumulative Explained Variance (%)")
axes[1].set_title("PCA — Cumulative Variance")
axes[1].grid(True, alpha=0.3)
axes[1].legend()

plt.tight_layout()
plt.savefig(f"{OUTPUT_FOLDER}/pca_scree_plot.png", dpi=150)
plt.show()

In [None]:
_feats = [f for f, _ in feat_imp]
_imps  = [v for _, v in feat_imp]

plt.figure(figsize=(8, 5))
plt.barh(_feats, _imps)
plt.xlabel("Importance")
plt.ylabel("Feature")
plt.title(f"GBT Feature Importance  (RMSE = {rmse:.6f})")
plt.grid(True, alpha=0.3)
plt.gca().invert_yaxis()
plt.tight_layout()
plt.savefig(f"{OUTPUT_FOLDER}/gbt_feature_importance.png", dpi=150)
plt.show()

## Save Results

In [None]:
def _spark_to_csv(sdf, path, columns=None):
    # Collect a (small) Spark DataFrame and write a single CSV via Python.
    cols = columns or sdf.columns
    rows = sdf.select(*cols).collect()
    with open(path, "w", newline="") as fh:
        writer = csv.writer(fh)
        writer.writerow(cols)
        for r in rows:
            writer.writerow([r[c] for c in cols])
    print(f"  ✓ {path}")

print("Saving outputs …")

# Portfolio
_spark_to_csv(portfolio, f"{OUTPUT_FOLDER}/smart_index.csv",
              ["symbol", "weight", "sharpe", "annual_vol"])

# K-Means clusters
_spark_to_csv(
    clustered.select("symbol", "cluster", "sharpe", "annual_ret", "annual_vol"),
    f"{OUTPUT_FOLDER}/kmeans_clusters.csv",
)

# K-Means portfolio (cluster leaders)
_spark_to_csv(cluster_leaders, f"{OUTPUT_FOLDER}/kmeans_portfolio.csv")

# PCA components
with open(f"{OUTPUT_FOLDER}/pca_components.csv", "w", newline="") as fh:
    writer = csv.writer(fh)
    writer.writerow(["component", "explained_variance", "cumulative_variance"])
    for i in range(len(explained_var)):
        writer.writerow([f"PC{i+1}", explained_var[i], cumsum_var[i]])
print(f"  ✓ {OUTPUT_FOLDER}/pca_components.csv")

# GBT feature importance
with open(f"{OUTPUT_FOLDER}/gbt_feature_importance.csv", "w", newline="") as fh:
    writer = csv.writer(fh)
    writer.writerow(["feature", "importance"])
    for feat, imp in feat_imp:
        writer.writerow([feat, imp])
print(f"  ✓ {OUTPUT_FOLDER}/gbt_feature_importance.csv")

print("\nAll outputs saved to", OUTPUT_FOLDER)

## Cleanup

In [None]:
spark.stop()
print("Spark session stopped.")