# Crypto Forecast (Colab-ready)

Conv1D + stacked LSTM forecaster on log-returns. Pulls data from a live API (`API_BASE_URL`, e.g., ngrok) or an uploaded CSV. Designed for sparse crypto snapshots: optional source filter, dynamic window shrink, small window defaults, LR scheduling, regularization, and clean exits when data is too small.

In [None]:
!pip -q install --upgrade requests pandas numpy matplotlib seaborn plotly tensorflow

In [None]:
import os
import json
import requests
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

# ---- Config (override via env) ----
API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000")
CSV_PATH = os.getenv("CSV_PATH", "/content/market_dataset.csv")
# Focused symbols: set via env; default BTC (set to DOGE for the second run)
TARGET_SYMBOL = os.getenv("TARGET_SYMBOL", "BTC").upper()
TARGET_SOURCE = os.getenv("TARGET_SOURCE", "CoinPaprika")  # filter to one source for less noise
# Small defaults for sparse data; adjust upward if you have more history
WINDOW_SIZE = int(os.getenv("WINDOW_SIZE", 12))
HORIZON = int(os.getenv("HORIZON", 1))
EPOCHS = int(os.getenv("EPOCHS", 30))
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 32))
VAL_SPLIT = 0.15
TEST_SPLIT = 0.15
def load_dataset() -> pd.DataFrame:
    """Attempt API fetch; fallback to uploaded CSV."""
    try:
        resp = requests.get(f"{API_BASE_URL}/markets", params={"limit": 1000}, timeout=15)
        resp.raise_for_status()
        payload = resp.json()
        df_api = pd.DataFrame(payload)
        if not df_api.empty:
            df_api["as_of"] = pd.to_datetime(df_api["as_of"])
            return df_api.sort_values("as_of")
    except Exception as exc:
        print(f"API fetch failed, will try CSV: {exc}")

    if os.path.exists(CSV_PATH):
        df_csv = pd.read_csv(CSV_PATH, parse_dates=["as_of"])
        return df_csv.sort_values("as_of")

    raise SystemExit("No data found. Export CSV locally or ensure the API is reachable.")


df = load_dataset()

# Optional: filter to a single source to reduce cross-source noise
if TARGET_SOURCE:
    df = df[df["source"] == TARGET_SOURCE]

df.head()

In [None]:
# Filter symbol and compute log returns
df_sym = df[df["symbol"].str.upper() == TARGET_SYMBOL].copy()
df_sym = df_sym.sort_values("as_of")
if df_sym.empty:
    raise SystemExit(f"Symbol {TARGET_SYMBOL} not found in dataset (after source filter: {TARGET_SOURCE}).")

df_sym["log_return"] = np.log(df_sym["price"].astype(float)).diff()
df_sym = df_sym.dropna(subset=["log_return"])
returns = df_sym["log_return"].to_numpy()

# Adjust window dynamically if data is sparse
min_needed = WINDOW_SIZE + HORIZON + 1
if len(returns) < min_needed:
    suggested = max(4, len(returns) - HORIZON - 1)
    if suggested <= 0:
        raise SystemExit(
            f"Not enough data to build any window. Have {len(returns)} points; "
            f"need at least {HORIZON + 2}. Run more ingests or use CSV with more history."
        )
    print(f"Data is sparse. Reducing WINDOW_SIZE from {WINDOW_SIZE} to {suggested}.")
    WINDOW_SIZE = suggested

def make_windows(series: np.ndarray, window: int, horizon: int):
    xs, ys = [], []
    for idx in range(len(series) - window - horizon + 1):
        xs.append(series[idx : idx + window])
        ys.append(series[idx + window : idx + window + horizon])
    return np.array(xs), np.array(ys)

X, y = make_windows(returns, WINDOW_SIZE, HORIZON)
X = np.expand_dims(X, axis=-1)

n = len(X)
if n == 0:
    raise SystemExit(
        f"Still no windows after adjusting window size. Have {len(returns)} return points. "
        "Run more ingests or lower WINDOW_SIZE further."
    )

test_n = int(n * TEST_SPLIT)
val_n = int(n * VAL_SPLIT)
train_n = n - val_n - test_n
if train_n < 1:
    train_n = 1
    val_n = max(0, n - train_n - test_n)
    test_n = max(0, n - train_n - val_n)

X_train, y_train = X[:train_n], y[:train_n]
X_val, y_val = X[train_n : train_n + val_n], y[train_n : train_n + val_n]
X_test, y_test = X[train_n + val_n :], y[train_n + val_n :]

# Scale using training stats
MU = X_train.mean()
SIGMA = X_train.std() + 1e-8
X_train = (X_train - MU) / SIGMA
X_val = (X_val - MU) / SIGMA
X_test = (X_test - MU) / SIGMA

print(f"Train windows: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")

In [None]:
tf.random.set_seed(7)

model = tf.keras.Sequential(
    [
        tf.keras.layers.Input(shape=(WINDOW_SIZE, 1)),
        tf.keras.layers.Conv1D(
            filters=32,
            kernel_size=3,
            padding="causal",
            activation="relu",
            kernel_regularizer=tf.keras.regularizers.l2(1e-5),
        ),
        tf.keras.layers.SpatialDropout1D(0.1),
        tf.keras.layers.LSTM(64, return_sequences=True, kernel_regularizer=tf.keras.regularizers.l2(1e-5)),
        tf.keras.layers.Dropout(0.15),
        tf.keras.layers.LSTM(32, kernel_regularizer=tf.keras.regularizers.l2(1e-5)),
        tf.keras.layers.Dense(16, activation="relu", kernel_regularizer=tf.keras.regularizers.l2(1e-5)),
        tf.keras.layers.Dense(HORIZON),
    ]
)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3, clipnorm=1.0)
model.compile(optimizer=optimizer, loss="mse", metrics=["mae"])
model.summary()

callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=8, min_delta=1e-5, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", patience=3, factor=0.5, min_lr=1e-5, verbose=1),
]

val_data = (X_val, y_val) if len(X_val) > 0 else None
history = model.fit(
    X_train,
    y_train,
    validation_data=val_data,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    verbose=2,
    callbacks=callbacks,
)

test_metrics = None
if len(X_test) > 0:
    test_loss, test_mae = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test MAE: {test_mae:.6f}")
    test_metrics = {"loss": float(test_loss), "mae": float(test_mae)}
else:
    print("No test set available (dataset too small); skipping test evaluation.")

model_path = f"/content/{TARGET_SYMBOL}_model.keras"
meta_path = f"/content/{TARGET_SYMBOL}_meta.json"

model.save(model_path)
meta = {
    "symbol": TARGET_SYMBOL,
    "window": WINDOW_SIZE,
    "horizon": HORIZON,
    "mu": float(MU),
    "sigma": float(SIGMA),
    "test": test_metrics,
    "source": TARGET_SOURCE,
}
with open(meta_path, "w") as f:
    json.dump(meta, f, indent=2)

print(f"Saved model to {model_path}")
print(f"Saved metadata to {meta_path}")

In [None]:
if len(X_test) == 0:
    print("No test set available; skipping prediction plot.")
else:
    preds = model.predict(X_test, verbose=0).squeeze()
    truth = y_test.squeeze()

    plt.figure(figsize=(10, 4))
    plt.plot(truth, label="Actual")
    plt.plot(preds, label="Predicted")
    plt.title(f"{TARGET_SYMBOL} returns — horizon {HORIZON}")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()