# River Linear Regression
# Import Dependencies
This notebook adapts the shared four-hour preprocessing template to train an online `river` linear regression model on Solana minute-ahead price deltas.

In [None]:
import os
import sys
import random
from pathlib import Path
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
import requests
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

try:
    from river import compose, linear_model, optim, preprocessing
except ImportError as exc:
    raise ImportError("Install the 'river' package to run this notebook.") from exc

sns.set_style("darkgrid")
plt.rcParams["figure.figsize"] = (12, 6)


def set_seed(seed: int = 42) -> None:
    """Lock seeds for numpy, random, and python hashing."""
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)


def find_project_root() -> Path:
    current = Path.cwd().resolve()
    for candidate in [current, *current.parents]:
        if (candidate / "src" / "mlProject").exists():
            return candidate
    raise RuntimeError("Unable to locate project root containing src/mlProject")


def ensure_pythonpath(root: Path) -> None:
    src_path = root / "src"
    if str(src_path) not in sys.path:
        sys.path.append(str(src_path))


def configure_environment(seed: int = 42) -> Path:
    set_seed(seed)
    project_root = find_project_root()
    ensure_pythonpath(project_root)
    return project_root


PROJECT_ROOT = configure_environment()
print(f"Project root: {PROJECT_ROOT}")

# Fetch Last Four Hours of 1-Minute Data
We gather a 240-point SOL/USDT minute window so the online learner consumes the same information stream as the Streamlit service.

In [None]:
CRYPTOCOMPARE_URL = "https://min-api.cryptocompare.com/data/v2/histominute"


def fetch_minute_data(symbol: str = "SOL", quote: str = "USD", minutes: int = 240) -> pd.DataFrame:
    params = {
        "fsym": symbol.upper(),
        "tsym": quote.upper(),
        "limit": minutes,
        "aggregate": 1,
    }
    response = requests.get(CRYPTOCOMPARE_URL, params=params, timeout=15)
    response.raise_for_status()
    payload = response.json()
    if payload.get("Response") != "Success":
        raise RuntimeError(f"CryptoCompare API error: {payload.get('Message')}")

    frame = pd.DataFrame(payload["Data"]["Data"])
    frame["datetime"] = pd.to_datetime(frame["time"], unit="s", utc=True)
    frame = frame.rename(
        columns={
            "close": "price",
            "volumefrom": "volume",
            "volumeto": "market_cap",
        }
    )
    frame = frame[["datetime", "price", "volume", "market_cap"]]
    frame = frame.set_index("datetime").sort_index()
    return frame


raw_minute_df = fetch_minute_data()
raw_minute_df.tail()
raw_minute_df.shape

# Preprocess & Align Time Series
We fill tiny gaps and enforce a continuous UTC minute index before engineering indicators.

In [None]:
def align_minute_frame(frame: pd.DataFrame) -> pd.DataFrame:
    cleaned = frame[~frame.index.duplicated(keep="last")]
    full_index = pd.date_range(
        end=cleaned.index.max(),
        periods=len(cleaned),
        freq="1min",
        tz="UTC",
    )
    aligned = cleaned.reindex(full_index)
    aligned = aligned.interpolate(method="time").bfill().ffill()
    return aligned


aligned_minute_df = align_minute_frame(raw_minute_df)
aligned_minute_df.head()

# Feature Engineering Pipeline
We invoke the existing indicator stack, craft minute-ahead labels, and compute residual deltas to stabilize the online optimization.

In [None]:
from mlProject.entity.config_entity import DataIngestionConfig
from mlProject.components.crypto_data_ingestion import CryptoDataIngestion

TEMP_ARTIFACT_DIR = PROJECT_ROOT / "artifacts" / "notebook_tmp"
TEMP_ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)

ingestion_config = DataIngestionConfig(
    root_dir=TEMP_ARTIFACT_DIR,
    source_url="",
    local_data_file=TEMP_ARTIFACT_DIR / "minute_data.zip",
    unzip_dir=TEMP_ARTIFACT_DIR / "unzipped",
)

ingestor = CryptoDataIngestion(ingestion_config)
feature_enriched_df = ingestor.add_technical_indicators(aligned_minute_df.copy())
feature_enriched_df = ingestor.create_prediction_targets(feature_enriched_df)
feature_enriched_df = feature_enriched_df.dropna(subset=["target_price_1min"]).fillna(0.0)

feature_enriched_df["target_delta_1min"] = feature_enriched_df["target_price_1min"] - feature_enriched_df["price"]

CLEAN_FEATURES = [
    "price", "volume", "market_cap",
    "sma_7", "sma_14", "sma_30",
    "ema_7", "ema_14",
    "macd", "macd_signal", "macd_histogram",
    "rsi",
    "bb_middle", "bb_upper", "bb_lower",
    "price_change_1h", "price_change_24h", "price_change_7d",
    "volume_sma", "volume_ratio",
    "volatility",
    "high_14d", "low_14d",
    "price_position",
]

features = feature_enriched_df[CLEAN_FEATURES].values
target_prices = feature_enriched_df["target_price_1min"].values
base_prices = feature_enriched_df["price"].values
target_deltas = feature_enriched_df["target_delta_1min"].values

print(f"Features shape: {features.shape}")
print(f"Target deltas shape: {target_deltas.shape}")

# Train River Linear Model
We iterate over the training window, updating a standard-scaler-plus-linear-regression pipeline via stochastic gradient descent, then score the held-out tail.

In [None]:
def build_river_model():
    optimizer = optim.SGD(learning_rate=optim.learning_rate.InverseTime(initial_rate=0.05))
    regressor = linear_model.LinearRegression(optimizer=optimizer, intercept_init=0.0)
    return compose.Pipeline(preprocessing.StandardScaler(), regressor)

def to_feature_dict(row: np.ndarray) -> dict[str, float]:
    return {feature: float(value) for feature, value in zip(CLEAN_FEATURES, row)}

split_index = int(len(features) * 0.8)
train_features = features[:split_index]
train_targets = target_deltas[:split_index]
valid_features = features[split_index:]
valid_targets = target_prices[split_index:]
valid_base = base_prices[split_index:]

river_model = build_river_model()
for row_values, target in zip(train_features, train_targets, strict=False):
    river_model.learn_one(to_feature_dict(row_values), float(target))

if len(valid_features) == 0:
    print("Validation window empty; gather more data before scoring.")
else:
    delta_valid_pred = [
        river_model.predict_one(to_feature_dict(row_values))
        for row_values in valid_features
    ]
    price_valid_pred = valid_base + np.array(delta_valid_pred)
    mae = mean_absolute_error(valid_targets, price_valid_pred)
    rmse = float(np.sqrt(mean_squared_error(valid_targets, price_valid_pred)))
    r2 = r2_score(valid_targets, price_valid_pred)
    print(f"Validation MAE : {mae:.6f}")
    print(f"Validation RMSE: {rmse:.6f}")
    print(f"Validation R^2 : {r2:.6f}")

# Evaluate Predictions
We compare the online model's reconstructed price path with the actual tail to ensure the incremental learner stays calibrated.

In [None]:
if len(valid_features) > 0:
    valid_timestamps = feature_enriched_df.index[split_index:]
    plt.figure()
    plt.plot(valid_timestamps, valid_targets, label="Actual", linewidth=2)
    plt.plot(valid_timestamps, price_valid_pred, label="River Linear", linewidth=2)
    plt.title("River Linear Minute-Ahead Forecast")
    plt.xlabel("Timestamp")
    plt.ylabel("Price (USD)")
    plt.legend()
    plt.tight_layout()
    plt.show()