# Crypto Price Insights – Exploratory Notebook

This notebook walks through an API-driven crypto analytics workflow that powers the "Crypto Price Insights Dashboard" portfolio project. Follow the sections to fetch data, engineer features, surface insights, and prototype automation-ready artifacts.

## 1. Set Up Environment and API Parameters

We'll configure the Python environment, declare reusable helpers, and define the API endpoints used throughout the workflow.

In [None]:
import json
import logging
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import train_test_split

PROJECT_ROOT = Path.cwd().parent
DATA_DIR = PROJECT_ROOT / "data"
RAW_DIR = DATA_DIR / "raw"
PROCESSED_DIR = DATA_DIR / "processed"
RAW_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

COINGECKO_BASE_URL = "https://api.coingecko.com/api/v3"
ENDPOINTS = {
    "market_chart": "/coins/{coin_id}/market_chart",
}
HEADERS = {
    "accept": "application/json",
    "user-agent": "crypto-price-insights-notebook/1.0",
}


def build_market_params(coin_id: str, vs_currency: str = "usd", days: int = 30, interval: Optional[str] = None) -> Dict[str, Any]:
    params: Dict[str, Any] = {
        "vs_currency": vs_currency,
        "days": days,
    }
    if interval:
        params["interval"] = interval
    return params


## 2. Fetch Market Data from CoinGecko API

We'll configure a resilient HTTP session, retrieve 30-day market data (price, volume, market cap), and persist raw responses for reproducibility.

In [None]:
def create_session() -> requests.Session:
    retry = Retry(
        total=5,
        read=5,
        connect=5,
        status=5,
        status_forcelist=(429, 500, 502, 503, 504),
        allowed_methods=("GET",),
        backoff_factor=1.0,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session = requests.Session()
    session.headers.update(HEADERS)
    session.mount("https://", adapter)
    session.mount("http://", adapter)
    return session


def fetch_market_chart(coin_id: str, vs_currency: str = "usd", days: int = 30, interval: Optional[str] = None) -> Dict[str, Any]:
    session = create_session()
    params = build_market_params(coin_id, vs_currency, days, interval)
    endpoint = ENDPOINTS["market_chart"].format(coin_id=coin_id)
    url = f"{COINGECKO_BASE_URL}{endpoint}"

    for attempt in range(5):
        response = session.get(url, params=params, timeout=30)
        if response.status_code == 429:
            wait_time = min(60, 2 ** attempt)
            logging.warning("Rate limited. Sleeping for %s seconds", wait_time)
            time.sleep(wait_time)
            continue
        response.raise_for_status()
        payload = response.json()
        timestamp = int(time.time())
        raw_path = RAW_DIR / f"{coin_id}_market_chart_{timestamp}.json"
        raw_path.write_text(json.dumps(payload, indent=2))
        logging.info("Saved raw payload to %s", raw_path)
        return payload

    raise RuntimeError("Unable to fetch data after multiple attempts")


coin_id = "bitcoin"
vs_currency = "usd"
lookback_days = 30
raw_payload = fetch_market_chart(coin_id=coin_id, vs_currency=vs_currency, days=lookback_days)
list(raw_payload.keys())

## 3. Normalize and Clean Time Series Data

Next we convert the raw JSON into tidy pandas DataFrames, align timestamps, forward-fill gaps, and eliminate duplicates.

In [None]:
def normalise_market_payload(payload: Dict[str, Any]) -> pd.DataFrame:
    def _to_frame(key: str, value_name: str) -> pd.DataFrame:
        frame = pd.DataFrame(payload.get(key, []), columns=["timestamp", value_name])
        if frame.empty:
            raise ValueError(f"Payload missing expected key: {key}")
        frame["timestamp"] = pd.to_datetime(frame["timestamp"], unit="ms", utc=True)
        return frame

    price_frame = _to_frame("prices", "price")
    cap_frame = _to_frame("market_caps", "market_cap")
    volume_frame = _to_frame("total_volumes", "volume")

    merged = price_frame.merge(cap_frame, on="timestamp", how="outer").merge(volume_frame, on="timestamp", how="outer")
    merged = merged.sort_values("timestamp").drop_duplicates(subset=["timestamp"], keep="last")
    merged = merged.set_index("timestamp")
    merged = merged.tz_convert("UTC").tz_localize(None)

    inferred_freq = pd.infer_freq(merged.index[:10])
    if inferred_freq:
        full_range = pd.date_range(start=merged.index.min(), end=merged.index.max(), freq=inferred_freq)
        merged = merged.reindex(full_range)

    merged = merged.ffill().bfill()
    return merged


market_df = normalise_market_payload(raw_payload)
market_df.head()


## 4. Engineer Financial Features

We derive log returns, rolling averages, exponential moving averages, and realized volatility across configurable windows.

In [None]:
def engineer_features(frame: pd.DataFrame, windows: tuple[int, ...] = (7, 14, 30)) -> pd.DataFrame:
    features = frame.copy()
    features["log_return"] = np.log(features["price"]).diff()
    features["simple_return"] = features["price"].pct_change()

    for window in windows:
        features[f"rolling_mean_{window}"] = features["price"].rolling(window).mean()
        features[f"ema_{window}"] = features["price"].ewm(span=window, adjust=False).mean()
        features[f"realized_vol_{window}"] = features["log_return"].rolling(window).std() * np.sqrt(365)

    features = features.dropna()
    processed_path = PROCESSED_DIR / f"{coin_id}_features.parquet"
    features.to_parquet(processed_path)
    logging.info("Saved processed features to %s", processed_path)
    return features


feature_df = engineer_features(market_df)
feature_df.head()


## 5. Summarize Statistical Metrics

We compute descriptive statistics, volatility estimates, and downside risk metrics to support storyline-ready insights.

In [None]:
def summarise_metrics(features: pd.DataFrame) -> tuple[pd.DataFrame, dict[str, float]]:
    returns = features["log_return"]
    mean_return = returns.mean()
    sigma_d = np.sqrt(((returns - mean_return) ** 2).sum() / (len(returns) - 1))
    annualised_vol = sigma_d * np.sqrt(365)
    downside_returns = returns[returns < 0]
    downside_sigma = np.sqrt(((downside_returns - downside_returns.mean()) ** 2).sum() / max(len(downside_returns) - 1, 1))
    downside_risk = downside_sigma * np.sqrt(365)

    summary_table = features[["price", "simple_return", "log_return"]].describe().T
    summary_metrics = {
        "mean_return": mean_return,
        "daily_volatility": sigma_d,
        "annualised_volatility": annualised_vol,
        "downside_risk": downside_risk,
        "max_drawdown": features["price"].div(features["price"].cummax()).min() - 1,
        "latest_price": features["price"].iloc[-1],
    }
    return summary_table, summary_metrics


summary_table, summary_metrics = summarise_metrics(feature_df)
summary_table, summary_metrics


## 6. Visualize Price Dynamics

We'll plot price trends with overlays, inspect return distributions, and check autocorrelation for serial dependence.

In [None]:
plt.style.use("seaborn-v0_8")

fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(feature_df.index, feature_df["price"], label="Close", color="#1f77b4")
for window in (7, 14, 30):
    ax.plot(feature_df.index, feature_df[f"rolling_mean_{window}"], label=f"SMA {window}", linestyle="--")
ax.set_title(f"{coin_id.title()} price trend with rolling means")
ax.set_ylabel(f"Price ({vs_currency.upper()})")
ax.legend()
ax.grid(alpha=0.3)
fig.tight_layout()
plt.show()

fig, ax = plt.subplots(figsize=(10, 4))
feature_df["simple_return"].hist(ax=ax, bins=40, density=True, alpha=0.6, label="Histogram")
feature_df["simple_return"].plot(kind="kde", ax=ax, color="black", label="KDE")
ax.set_title("Daily return distribution")
ax.set_xlabel("Return")
ax.legend()
plt.show()

from pandas.plotting import autocorrelation_plot

fig, ax = plt.subplots(figsize=(10, 4))
autocorrelation_plot(feature_df["simple_return"], ax=ax)
ax.set_title("Autocorrelation of daily returns")
plt.show()


## 7. Prototype Directional Classifier

We label next-day direction, split into train/test sets, fit a baseline logistic regression, and evaluate performance.

In [None]:
classifier_df = feature_df.copy()
classifier_df["target"] = (classifier_df["price"].shift(-1) > classifier_df["price"]).astype(int)
classifier_df = classifier_df.dropna()

feature_columns = [
    "simple_return",
    "log_return",
    "rolling_mean_7",
    "rolling_mean_14",
    "rolling_mean_30",
    "ema_7",
    "ema_14",
    "ema_30",
    "realized_vol_7",
    "realized_vol_14",
    "realized_vol_30",
]

X = classifier_df[feature_columns]
y = classifier_df["target"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, shuffle=False)
model = LogisticRegression(max_iter=500)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)

baseline_accuracy = accuracy_score(y_test, y_pred)
print(f"Logistic regression accuracy: {baseline_accuracy:.3f}")
print(classification_report(y_test, y_pred))


## 8. Schedule Automated Data Refresh

We'll sketch a CLI entrypoint that orchestrates fetch, transform, and reporting runs, and outline scheduling options for cron and Windows Task Scheduler.

In [None]:
from textwrap import dedent

cli_path = PROJECT_ROOT / "scripts" / "run_pipeline.py"
cli_code = dedent(
    """
    import argparse
    import logging
    import sys
    from pathlib import Path

    PROJECT_ROOT = Path(__file__).resolve().parents[1]
    SRC_DIR = PROJECT_ROOT / "src"
    if str(SRC_DIR) not in sys.path:
        sys.path.insert(0, str(SRC_DIR))

    from crypto_dashboard.pipeline import run_pipeline

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s",
    )

    def build_parser() -> argparse.ArgumentParser:
        parser = argparse.ArgumentParser(description="Run the crypto price insights pipeline")
        parser.add_argument("--coin-id", default="bitcoin", help="CoinGecko coin identifier")
        parser.add_argument("--vs-currency", default="usd", help="Quote currency")
        parser.add_argument("--days", type=int, default=90, help="Number of days of history")
        parser.add_argument(
            "--export",
            type=Path,
            default=PROJECT_ROOT / "data" / "processed" / "latest_enriched.csv",
            help="CSV export path",
        )
        return parser

    def main() -> None:
        args = build_parser().parse_args()
        logging.info("Running pipeline for %s/%s over %s days", args.coin_id, args.vs_currency, args.days)
        results = run_pipeline(
            coin_id=args.coin_id,
            vs_currency=args.vs_currency,
            days=args.days,
            export_path=args.export,
        )
        logging.info("Exported enriched dataset to %s", args.export)
        logging.info("Computed metrics: %s", results["metrics"])

    if __name__ == "__main__":
        main()
    """
)
cli_path.write_text(cli_code)
cli_path

### Scheduling tips

- **Cron (Linux/macOS)**: `0 6 * * * /usr/bin/env PYTHONPATH="/path/to/project/src" /usr/bin/python /path/to/project/scripts/run_pipeline.py --coin-id bitcoin --days 90`
- **Windows Task Scheduler**: Create a *Basic Task*, point to `powershell.exe`, and use an argument like `-Command "Set-Location 'C:\\path\\to\\project'; $env:PYTHONPATH='src'; python .\\scripts\\run_pipeline.py --coin-id bitcoin --days 90"`.
- Capture logs by redirecting stdout/stderr to a log file for later inspection, e.g. append `>> logs/pipeline.log 2>&1`.
