In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# scripts/forecast_tsla.py
# Predict Tesla prices with ARIMA and LSTM, evaluate on a chronological split.

import os
import math
import random
import pickle
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Import statsmodels for ARIMA
import statsmodels.api as sm
from statsmodels.tsa.arima.model import ARIMA as st_ARIMA # Renaming to avoid conflict

# -----------------------------
# Config
# -----------------------------
SPLIT_DATE = "2024-01-01"          # Train: < 2024-01-01, Test: >= 2024-01-01
LOOKBACK = 60                      # LSTM sequence length (days)
EPOCHS = 40                        # tweak as desired
BATCH_SIZE = 32
LR = 1e-3                          # LSTM learning rate
SEED = 42

# -----------------------------
# Paths
# -----------------------------
# SCRIPT_DIR = Path(__file__).resolve().parent # __file__ is not defined in Colab notebook
# ROOT_DIR = SCRIPT_DIR.parent
# CLEANED_DIR = ROOT_DIR / "data" / "cleaned"

# Define paths relative to the Colab environment, assuming data is in Google Drive
DRIVE_DIR = Path('/content/drive/MyDrive')
CLEANED_DIR = DRIVE_DIR / "data" / "cleaned"
OUT_DIR = DRIVE_DIR / "TSLA_forecast_outputs" # Use a dedicated output directory in Drive
OUT_FORECASTS = OUT_DIR / "forecasts"
OUT_PLOTS = OUT_DIR / "plots"
OUT_REPORTS = OUT_DIR / "reports"
OUT_PREDICTIONS = OUT_DIR / "predictions"
OUT_MODELS = OUT_DIR / "models"

for p in [OUT_FORECASTS, OUT_PLOTS, OUT_REPORTS, OUT_PREDICTIONS]:
    p.mkdir(parents=True, exist_ok=True)

# -----------------------------
# Utils
# -----------------------------
def set_seeds(seed=SEED):
    np.random.seed(seed)
    random.seed(seed)
    try:
        import tensorflow as tf
        tf.random.set_seed(seed)
        os.environ["PYTHONHASHSEED"] = str(seed)
    except Exception:
        pass

def find_tsla_csv(cleaned_dir: Path) -> Path:
    # Prefer filenames containing 'TSLA'
    candidates = [p for p in cleaned_dir.glob("*.csv")]
    tsla_like = [p for p in candidates if "tsla" in p.stem.lower()]
    if tsla_like:
        return tsla_like[0]
    # Fallback: try to look into files and check ticker column if present
    for p in candidates:
        try:
            head = pd.read_csv(p, nrows=5)
            if any(c.lower() == "ticker" for c in head.columns):
                if "TSLA" in head["Ticker"].astype(str).unique():
                    return p
        except Exception:
            continue
    # Final fallback: first CSV (warn)
    if candidates:
        print(f"⚠️ TSLA file not found by name. Using: {candidates[0].name}")
        return candidates[0]
    raise FileNotFoundError(f"No CSV files found in {cleaned_dir}")

def load_tsla_series(csv_path: Path) -> pd.DataFrame:
    df = pd.read_csv(csv_path)
    # Parse dates robustly
    df["Date"] = pd.to_datetime(df["Date"], errors="coerce")
    df = df.dropna(subset=["Date"]).sort_values("Date").reset_index(drop=True)

    price_col = "Adj Close" if "Adj Close" in df.columns else "Close"
    if price_col not in df.columns:
        raise ValueError(f"{csv_path.name}: neither 'Adj Close' nor 'Close' present.")
    df[price_col] = pd.to_numeric(df[price_col], errors="coerce")
    df = df.dropna(subset=[price_col]).copy()
    df.rename(columns={price_col: "Price"}, inplace=True)
    return df[["Date", "Price"]]

def train_test_split_chrono(df: pd.DataFrame, split_date: str):
    train = df[df["Date"] < split_date].copy()
    test = df[df["Date"] >= split_date].copy()
    if len(train) == 0 or len(test) == 0:
        raise ValueError(f"Split resulted in empty sets. Check SPLIT_DATE={split_date}.")
    return train, test

def metrics(y_true, y_pred):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    mae = np.mean(np.abs(y_true - y_pred))
    rmse = math.sqrt(np.mean((y_true - y_pred) ** 2))
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    smape = 100 * np.mean(2 * np.abs(y_pred - y_true) / (np.abs(y_true) + np.abs(y_pred)))
    # R^2
    ss_res = np.sum((y_true - y_pred) ** 2)
    ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
    r2 = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
    return {"MAE": mae, "RMSE": rmse, "MAPE%": mape, "sMAPE%": smape, "R2": r2}

# -----------------------------
# ARIMA (using statsmodels)
# -----------------------------
def arima_forecast(train: pd.Series, horizon: int):
    # Using a simple (5,1,0) ARIMA model as a starting point.
    # More sophisticated order selection like auto_arima would require more code
    # or a different library, but this avoids the pmdarima conflict.
    order = (5, 1, 0)
    model = st_ARIMA(train, order=order)
    model_fit = model.fit()
    # Forecast includes the training data points that are used for prediction.
    # We only need the forecast for the horizon.
    preds = model_fit.forecast(steps=horizon)
    return preds.values, model_fit # Return values as numpy array

# -----------------------------
# LSTM
# -----------------------------
def make_sequences(arr, lookback):
    X, y = [], []
    for i in range(lookback, len(arr)):
        X.append(arr[i - lookback:i, 0])
        y.append(arr[i, 0])
    X = np.array(X)
    y = np.array(y)
    return X[..., np.newaxis], y  # shape: (samples, lookback, 1), (samples,)

def lstm_forecast(train_prices: pd.Series, test_prices: pd.Series, lookback=LOOKBACK):
    try:
        import tensorflow as tf
        from tensorflow.keras import Sequential
        from tensorflow.keras.layers import LSTM, Dense, Dropout
        from tensorflow.keras.callbacks import EarlyStopping
        from sklearn.preprocessing import MinMaxScaler
    except ImportError:
        raise SystemExit("Please install TensorFlow and scikit-learn: pip install tensorflow scikit-learn")

    # Scale (fit on train only)
    scaler = MinMaxScaler(feature_range=(0, 1))
    train_scaled = scaler.fit_transform(train_prices.values.reshape(-1, 1))
    test_scaled = scaler.transform(test_prices.values.reshape(-1, 1))

    # Build sequences
    X_train, y_train = make_sequences(train_scaled, lookback)

    # For test, prepend the last lookback points from training to construct windows
    combo = np.vstack([train_scaled[-lookback:], test_scaled])
    X_test, y_test_scaled = make_sequences(combo, lookback)

    # Model
    tf.keras.backend.clear_session()
    set_seeds(SEED)
    model = Sequential([
        LSTM(64, return_sequences=True, input_shape=(lookback, 1)),
        Dropout(0.2),
        LSTM(32),
        Dense(1)
    ])
    opt = tf.keras.optimizers.Adam(learning_rate=LR)
    model.compile(optimizer=opt, loss="mse")

    es = EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True)
    hist = model.fit(
        X_train, y_train,
        validation_split=0.1,
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        callbacks=[es],
        verbose=1
    )

    # Predict (scaled), then invert scaling
    y_pred_scaled = model.predict(X_test, verbose=0)
    y_pred = scaler.inverse_transform(y_pred_scaled).ravel()

    # Align to test dates (one prediction per test day)
    return y_pred, model, hist

# -----------------------------
# Plotting
# -----------------------------
def plot_forecasts(df, train, test, arima_pred, lstm_pred):
    plt.figure(figsize=(12, 6))
    plt.plot(df["Date"], df["Price"], label="Actual", linewidth=1.2)
    plt.axvline(pd.Timestamp(SPLIT_DATE), linestyle="--", label="Split", alpha=0.7)
    # Overlay predictions on test period
    test_dates = test["Date"].values
    plt.plot(test_dates, arima_pred, label="ARIMA Forecast")
    plt.plot(test_dates, lstm_pred, label="LSTM Forecast")
    plt.title("TSLA — Actual vs Forecasts (ARIMA & LSTM)")
    plt.xlabel("Date"); plt.ylabel("Price")
    plt.legend(); plt.grid(True)
    out_path = OUT_PLOTS / "TSLA_actual_vs_forecasts.png"
    plt.tight_layout(); plt.savefig(out_path); plt.close()
    print(f"📈 Saved: {out_path}")

def plot_lstm_training(history):
    try:
        loss = history.history["loss"]
        val_loss = history.history.get("val_loss")
    except Exception:
        return
    plt.figure(figsize=(8, 4))
    plt.plot(loss, label="Train Loss")
    if val_loss is not None:
        plt.plot(val_loss, label="Val Loss")
    plt.title("LSTM Training Loss"); plt.xlabel("Epoch"); plt.ylabel("MSE")
    plt.legend(); plt.grid(True)
    out_path = OUT_PLOTS / "TSLA_lstm_training_loss.png"
    plt.tight_layout(); plt.savefig(out_path); plt.close()
    print(f"🧪 Saved: {out_path}")

# -----------------------------
# Main
# -----------------------------
def main():
    import warnings
    warnings.filterwarnings("ignore")
    set_seeds(SEED)

    tsla_csv = find_tsla_csv(CLEANED_DIR)
    print(f"📂 Using file: {tsla_csv}")

    df = load_tsla_series(tsla_csv)
    train, test = train_test_split_chrono(df, SPLIT_DATE)
    print(f"Train: {train['Date'].iloc[0].date()} → {train['Date'].iloc[-1].date()} | "
          f"Test: {test['Date'].iloc[0].date()} → {test['Date'].iloc[-1].date()} "
          f"({len(test)} days)")

    # ---------- ARIMA ----------
    arima_pred, arima_model = arima_forecast(train["Price"], horizon=len(test))
    print(f"ARIMA Predictions type: {type(arima_pred)}, shape: {arima_pred.shape if hasattr(arima_pred, 'shape') else 'N/A'}")
    print(f"ARIMA Model type: {type(arima_model)}")
    arima_metrics = metrics(test["Price"].values, arima_pred)

    # ---------- LSTM ----------
    lstm_pred, lstm_model, hist = lstm_forecast(train["Price"], test["Price"], lookback=LOOKBACK)
    lstm_metrics = metrics(test["Price"].values, lstm_pred)

    # ---------- Save predictions ----------
    pred_df = pd.DataFrame({
        "Date": test["Date"].values,
        "Actual": test["Price"].values,
        "ARIMA_Pred": arima_pred,
        "LSTM_Pred": lstm_pred
    })
    out_pred = OUT_PREDICTIONS / "TSLA_predictions.csv"
    pred_df.to_csv(out_pred, index=False)
    print(f"💾 Saved predictions: {out_pred}")

    # ---------- Save models ----------
    arima_model_path = OUT_MODELS / "tsla_arima.pkl"
    with open(arima_model_path, 'wb') as f:
        pickle.dump(arima_model, f)
    print(f"💾 Saved ARIMA model: {arima_model_path}")

    lstm_model_path = OUT_MODELS / "tsla_lstm.h5"
    lstm_model.save(lstm_model_path)
    print(f"💾 Saved LSTM model: {lstm_model_path}")

    # ---------- Plots ----------
    plot_forecasts(df, train, test, arima_pred, lstm_pred)
    plot_lstm_training(hist)

    # ---------- Metrics report ----------
    md = f"""# TSLA Forecast — ARIMA vs LSTM

**Train period:** {train['Date'].iloc[0].date()} → {train['Date'].iloc[-1].date()}
**Test period:** {test['Date'].iloc[0].date()} → {test['Date'].iloc[-1].date()}
**Horizon:** {len(test)} trading days

## Test Metrics
### ARIMA
- MAE:   {arima_metrics['MAE']:.4f}
- RMSE:  {arima_metrics['RMSE']:.4f}
- MAPE:  {arima_metrics['MAPE%']:.2f}%
- sMAPE: {arima_metrics['sMAPE%']:.2f}%
- R²:    {arima_metrics['R2']:.4f}

### LSTM
- MAE:   {lstm_metrics['MAE']:.4f}
- RMSE:  {lstm_metrics['RMSE']:.4f}
- MAPE:  {lstm_metrics['MAPE%']:.2f}%
- sMAPE: {lstm_metrics['sMAPE%']:.2f}%
- R²:    {lstm_metrics['R2']:.4f}

## Notes
- Data split is **chronological**; no shuffling.
- ARIMA provides interpretability (orders, residual diagnostics), while LSTM can capture **non-linear** patterns at the cost of more data & tuning.
- Consider forecasting **returns** and integrating other features (volume, macro, sector ETF) if you want to push accuracy further.
"""
    report_path = OUT_REPORTS / "TSLA_forecast_metrics.md"
    report_path.write_text(md, encoding="utf-8")
    print(f"📝 Saved metrics report: {report_path}")

if __name__ == "__main__":
    # Soft dependency checks with helpful messages
    try:
        import pandas as pd  # noqa
        import statsmodels   # noqa
    except ImportError:
        print("Please install requirements: pip install pandas statsmodels tensorflow scikit-learn matplotlib")
        raise
    main()

📂 Using file: /content/drive/MyDrive/data/cleaned/TSLA.csv
Train: 2015-07-01 → 2023-12-29 | Test: 2024-01-02 → 2025-07-30 (395 days)
ARIMA Predictions type: <class 'numpy.ndarray'>, shape: (395,)
ARIMA Model type: <class 'statsmodels.tsa.arima.model.ARIMAResultsWrapper'>
Epoch 1/40
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 54ms/step - loss: 0.0267 - val_loss: 0.0018
Epoch 2/40
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 48ms/step - loss: 0.0013 - val_loss: 0.0017
Epoch 3/40
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 60ms/step - loss: 0.0012 - val_loss: 0.0015
Epoch 4/40
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 47ms/step - loss: 0.0011 - val_loss: 0.0014
Epoch 5/40
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 46ms/step - loss: 0.0011 - val_loss: 0.0013
Epoch 6/40
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 53ms/step - loss: 0.0010 - val_loss: 0.0012
Epoch 7/40
[