In [None]:
import os
import time
import requests
import pandas as pd
import numpy as np
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timedelta, timezone

# -------------------- Configuration --------------------------------------------------
SAVE_DIR = r"\temp"

TODAY = datetime.now(timezone.utc).date()
END_DATE = TODAY - timedelta(days=1)                  # yesterday (UTC)
START_DATE = END_DATE - timedelta(days=730)           # ~2 years back
WINDOW_DAYS = 360

# Robustness settings
MIN_GOOD_ROWS = 180
AGG_INTRADAY_TO_DAILY = False
AGG_METHOD = "mean"           # "mean" or "last"

# Frequency normalization toggles
FORCE_MINUTE_TO_15MIN = True  # snap minute series that are actually on 00/15/30/45 to 15min bins
COALESCE_HOURLY_TO_6H = True  # resample hourly series to clean 6-hour bins (UTC anchored)

# -------------------- API & Metrics --------------------------------------------------
BASE = "https://api.blockchain.info/charts/{slug}"
UA = {"User-Agent": "onchain-collector/2.6 (+academic use)"}

NETWORK_ACTIVITY = {
    "n-unique-addresses": "unique_addresses_used",
    "n-transactions": "confirmed_tx_per_day",
    "transactions-per-second": "tx_rate_per_second",
    "output-volume": "output_value_per_day_btc",
    "mempool-count": "mempool_tx_count",
    "mempool-growth": "mempool_size_growth_bytes",
    "mempool-size": "mempool_size_bytes",
    "utxo-count": "utxo_count",
    "estimated-transaction-volume-usd": "estimated_tx_value_usd"    
}
MARKET_SIGNALS = {"mvrv": "mvrv", "nvt": "nvt", "nvts": "nvts"}
BLOCK_DETAILS = {
    "n-transactions-per-block": "avg_tx_per_block",
    "median-confirmation-time": "median_confirmation_time_min",
    "avg-confirmation-time": "avg_confirmation_time_min",
    "n-transactions-total": "total_tx_count"
}
MINING_INFO = {
    "hash-rate": "total_hash_rate_ths",
    "difficulty": "network_difficulty",
    "miners-revenue": "miners_revenue_usd",
    "transaction-fees": "total_tx_fees_btc",
    "fees-usd-per-transaction": "fees_usd_per_tx",
    "cost-per-transaction-percent": "cost_pct_of_tx_volume",
    "blocks-size": "blockchain_size",   
    "avg-block-size": "avg_block_size"
}

METRICS: Dict[str, str] = {}
for group in (NETWORK_ACTIVITY, MARKET_SIGNALS, BLOCK_DETAILS, MINING_INFO):
    METRICS.update(group)

PREFER_TIMESPAN = {"mvrv", "nvt", "nvts"}

# -------------------- HTTP helper ----------------------------------------------------
def fetch_chart(slug: str, params: Dict[str, str]) -> Optional[Dict[str, Any]]:
    url = BASE.format(slug=slug)
    try:
        r = requests.get(url, params=params, headers=UA, timeout=30)
        if r.status_code == 404:
            print(f"[WARN] {slug}: 404 Not Found — skipping.")
            return None
        if r.status_code == 429:
            print(f"[WARN] {slug}: 429 Too Many Requests — pausing 10s and retrying once.")
            time.sleep(10)
            r = requests.get(url, params=params, headers=UA, timeout=30)
        r.raise_for_status()
        return r.json()
    except requests.exceptions.RequestException as e:
        print(f"[ERROR] {slug}: Network request failed: {e}")
        return None

# -------------------- Transform helpers ---------------------------------------------
def to_df_with_meta(payload: Dict[str, Any], value_col: str) -> Tuple[pd.DataFrame, str]:
    period = payload.get("period", "")
    vals = payload.get("values", [])
    if not vals:
        return pd.DataFrame(columns=["ts", value_col]), period
    df = pd.DataFrame(vals)
    df["ts"] = pd.to_datetime(df["x"], unit="s", utc=True)
    df = df.rename(columns={"y": value_col})
    df = df[["ts", value_col]].sort_values("ts")
    return df, period

def trim_to_window(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty:
        return df
    return df[(df["ts"].dt.date >= START_DATE) & (df["ts"].dt.date <= END_DATE)]

def dedupe_on_index(df: pd.DataFrame) -> pd.DataFrame:
    if df.index.has_duplicates:
        df = df[~df.index.duplicated(keep="last")]
    return df

def to_daily(df: pd.DataFrame, value_col: str, how: str = "mean") -> pd.DataFrame:
    sdf = df.set_index("ts").sort_index()
    rule = sdf.resample("D")
    sdf = rule.mean() if how == "mean" else rule.last()
    sdf.index.name = "date"
    # keep rows where the metric exists
    if value_col in sdf.columns:
        sdf = sdf.dropna(subset=[value_col])
    else:
        sdf = sdf.dropna(how="all")
    return dedupe_on_index(sdf)

def to_period_index(df: pd.DataFrame, period: str, value_col: str) -> pd.DataFrame:
    if df.empty:
        return df
    if period == "day":
        out = df.copy()
        out["date"] = out["ts"].dt.normalize()
        out = out.drop(columns=["ts"]).set_index("date").sort_index()
        return dedupe_on_index(out)
    else:
        out = df.set_index("ts").sort_index()
        return dedupe_on_index(out)

# ------------ Frequency normalization (fix minute & 6-hour issues) -------------------
def _mode_step_seconds(idx: pd.DatetimeIndex) -> Optional[int]:
    if len(idx) < 3:
        return None
    diffs = pd.Series(idx).diff().dropna().dt.total_seconds().astype(int)
    if diffs.empty:
        return None
    # use mode (most frequent delta)
    try:
        return int(diffs.mode().iloc[0])
    except Exception:
        return int(diffs.median())

def _rule_from_step(step_s: int) -> str:
    if step_s % 86400 == 0:
        return "D"
    if step_s % 3600 == 0:
        return f"{step_s // 3600}H"
    if step_s % 60 == 0:
        return f"{step_s // 60}min"
    return f"{step_s}S"

def _resample(sdf: pd.DataFrame, rule: str, keep_cols: Optional[List[str]] = None) -> pd.DataFrame:
    agg = "mean" if AGG_METHOD == "mean" else "last"
    # anchor bins at UTC midnight to avoid “mixed” intervals
    resampler = sdf.resample(rule, label="right", closed="right", origin="start_day")
    out = getattr(resampler, agg)()
    if keep_cols:
        out = out[keep_cols]
    out.index.name = "date" if rule.upper() == "D" else "ts_utc"
    return out.dropna(how="all")

def normalize_frequency(df: pd.DataFrame, reported_period: str, value_col: str) -> Tuple[pd.DataFrame, str]:
    """
    Ensure a clean, regular grid:
      - 'day' stays daily
      - 'minute' on 00/15/30/45 => resample to 15min (if FORCE_MINUTE_TO_15MIN)
      - 'hour' => coalesce to 6H if enabled
      - Otherwise, use modal step across the series
    Returns (normalized_df, label_for_folder)
    """
    if df.empty:
        return df, reported_period or "unknown"

    # Daily already normalized
    if reported_period == "day" and (df.index.name == "date"):
        return df, "day"

    # Ensure DateTimeIndex
    sdf = df.copy()
    if not isinstance(sdf.index, pd.DatetimeIndex):
        # Expect 'ts' was the index for intraday
        if "ts" in sdf.columns:
            sdf = sdf.set_index("ts").sort_index()
        else:
            sdf.index = pd.to_datetime(sdf.index, utc=True)

    idx = sdf.index

    # Detect quarter-hour pattern
    is_quarter_marks = (idx.minute % 15 == 0).all() and (idx.second == 0).all()

    if reported_period == "minute" and FORCE_MINUTE_TO_15MIN and is_quarter_marks:
        cleaned = _resample(sdf, "15min", keep_cols=[value_col] if value_col in sdf.columns else None)
        return cleaned, "15min"

    if reported_period == "hour" and COALESCE_HOURLY_TO_6H:
        cleaned = _resample(sdf, "6H", keep_cols=[value_col] if value_col in sdf.columns else None)
        return cleaned, "6h"

    # Fallback: infer modal step and resample to that exact rule
    step_s = _mode_step_seconds(idx)
    if step_s is None:
        # nothing to normalize; keep as-is, label by reported_period
        return sdf, (reported_period or "intraday")
    rule = _rule_from_step(step_s)
    cleaned = _resample(sdf, rule, keep_cols=[value_col] if value_col in sdf.columns else None)

    # Pretty label (e.g., "1H"->"hour", "5min"->"5min")
    if rule == "D":
        label = "day"
    elif rule.endswith("H") and rule != "6H":
        # 1H -> hour ; 2H -> 2h, etc.
        label = "hour" if rule == "1H" else rule.lower()
    else:
        label = rule.lower()

    return cleaned, label

# -------------------- Fetch strategies ----------------------------------------------
def pull_timespan(slug: str, col: str) -> Tuple[pd.DataFrame, str]:
    payload = fetch_chart(slug, {"timespan": "2years", "format": "json", "sampled": "false"})
    if payload is None:
        return pd.DataFrame(columns=[col]), ""
    raw_df, period = to_df_with_meta(payload, col)
    raw_df = trim_to_window(raw_df)
    if raw_df.empty:
        return pd.DataFrame(columns=[col]), period

    if AGG_INTRADAY_TO_DAILY and period != "day":
        out = to_daily(raw_df, col, how=AGG_METHOD)
        print(f"[INFO] {slug}: timespan OK | period={period or '?'} aggregated→day | rows={len(out)}")
        return out, "day"

    out = to_period_index(raw_df, period, col)
    # ---- normalize frequency for intraday series
    out, label = normalize_frequency(out, period, col)
    print(f"[INFO] {slug}: timespan OK | reported={period or '?'} | saved_as={label} | rows={len(out)}")
    return out, label

def pull_chunked_then_timespan(slug: str, col: str) -> Tuple[pd.DataFrame, str]:
    frames: List[pd.DataFrame] = []
    periods: List[str] = []
    cur_start = START_DATE

    while cur_start <= END_DATE:
        params = {"start": cur_start.strftime("%Y-%m-%d"), "format": "json", "sampled": "false"}
        payload = fetch_chart(slug, params)
        if payload is None:
            return pd.DataFrame(columns=[col]), ""

        part_df, period = to_df_with_meta(payload, col)
        if not part_df.empty:
            frames.append(part_df)
            periods.append(period or "")

        cur_start = cur_start + timedelta(days=WINDOW_DAYS)
        time.sleep(0.35)

    if not frames:
        return pull_timespan(slug, col)

    df_all = pd.concat(frames, axis=0).sort_values("ts")
    df_all = trim_to_window(df_all)

    if len(df_all) >= MIN_GOOD_ROWS:
        base = to_period_index(df_all, periods[0] if periods else "", col)
        # ---- normalize frequency (fix min and 6h bins)
        base, label = normalize_frequency(base, periods[0] if periods else "", col)
        print(f"[INFO] {slug}: chunked OK | reported={periods[0] if periods else '?'} | saved_as={label} | rows={len(base)}")
        return base, label

    print(f"[INFO] {slug}: chunked short/mixed (periods={set(periods)}, rows={len(df_all)}). Falling back to timespan.")
    return pull_timespan(slug, col)

def pull_metric(slug: str, col: str) -> Tuple[pd.DataFrame, str]:
    if slug in PREFER_TIMESPAN:
        df, period = pull_timespan(slug, col)
        if not df.empty:
            return df, period
        return pull_chunked_then_timespan(slug, col)
    else:
        return pull_chunked_then_timespan(slug, col)

# -------------------- Main -----------------------------------------------------------
def main(save_per_metric: bool = False) -> Dict[str, pd.DataFrame]:
    print(f"Target window (UTC): {START_DATE} → {END_DATE}")
    print(f"Base save directory: '{os.path.abspath(SAVE_DIR)}'")

    by_label: Dict[str, List[pd.DataFrame]] = {}

    for slug, col in METRICS.items():
        df, label = pull_metric(slug, col)
        if df.empty or not label:
            print(f"[WARN] {slug:>30s} ({col}): no data — skipped")
            continue

        if save_per_metric:
            period_dir = os.path.join(SAVE_DIR, label)
            os.makedirs(period_dir, exist_ok=True)
            idx_name = df.index.name or "ts_utc"
            tmp = df.reset_index().rename(columns={idx_name: "timestamp_or_date"})
            filename = f"{slug}_from_{START_DATE}_to_{END_DATE}.csv"
            filepath = os.path.join(period_dir, filename)
            tmp.to_csv(filepath, index=False, float_format="%.10g")

        by_label.setdefault(label, []).append(df.rename(columns={df.columns[0]: col}))
        print(f"[OK] {slug:>30s} | saved_as={label:>6s} | rows={len(df)}")
        time.sleep(0.4)

    if not by_label:
        raise RuntimeError("No non-empty series collected. Check network or slugs.")

    merged_by_label: Dict[str, pd.DataFrame] = {}
    for label, frames in by_label.items():
        period_dir = os.path.join(SAVE_DIR, label)
        os.makedirs(period_dir, exist_ok=True)

        merged = pd.concat(frames, axis=1, join="outer", sort=True).dropna(how="all")
        merged_by_label[label] = merged

        base_filename = f"btc_merged_from_{START_DATE}_to_{END_DATE}"
        csv_path = os.path.join(period_dir, f"{base_filename}.csv")
        pq_path = os.path.join(period_dir, f"{base_filename}.parquet")

        index_label = "date" if label == "day" else "ts_utc"
        merged.to_csv(csv_path, float_format="%.10g", index_label=index_label)

        try:
            merged.to_parquet(pq_path)
        except Exception as e:
            print(f"[WARN] Could not save Parquet for {label} data: {e}")

        print(f"[SAVE] Merged {label} data to: {csv_path} | cols={len(merged.columns)} | rows={len(merged)}")

    return merged_by_label

if __name__ == "__main__":
    merged_sets = main(save_per_metric=False)


Target window (UTC): 2023-09-28 → 2025-09-27
Base save directory: '/home/azureuser/maper_trader/C:\Users\yoonc\Jupyter Notebook'
[INFO] n-unique-addresses: chunked OK | reported=day | saved_as=day | rows=731
[OK]             n-unique-addresses | saved_as=   day | rows=731
[INFO] n-transactions: chunked OK | reported=day | saved_as=day | rows=731
[OK]                 n-transactions | saved_as=   day | rows=731
[INFO] transactions-per-second: chunked OK | reported=minute | saved_as=15min | rows=2017
[OK]        transactions-per-second | saved_as= 15min | rows=2017
[INFO] output-volume: chunked OK | reported=day | saved_as=day | rows=731
[OK]                  output-volume | saved_as=   day | rows=731
[INFO] mempool-count: chunked OK | reported=minute | saved_as=15min | rows=2019
[OK]                  mempool-count | saved_as= 15min | rows=2019
[INFO] mempool-growth: chunked OK | reported=minute | saved_as=15min | rows=2017
[OK]                 mempool-growth | saved_as= 15min | rows=2017

  resampler = sdf.resample(rule, label="right", closed="right", origin="start_day")


[INFO] utxo-count: chunked OK | reported=hour | saved_as=6h | rows=2924
[OK]                     utxo-count | saved_as=    6h | rows=2924
[INFO] estimated-transaction-volume-usd: chunked OK | reported=day | saved_as=day | rows=727
[OK] estimated-transaction-volume-usd | saved_as=   day | rows=727
[WARN] mvrv: 404 Not Found — skipping.
[WARN] mvrv: 404 Not Found — skipping.
[WARN]                           mvrv (mvrv): no data — skipped
[WARN] nvt: 404 Not Found — skipping.
[WARN] nvt: 404 Not Found — skipping.
[WARN]                            nvt (nvt): no data — skipped
[WARN] nvts: 404 Not Found — skipping.
[WARN] nvts: 404 Not Found — skipping.
[WARN]                           nvts (nvts): no data — skipped
[INFO] n-transactions-per-block: chunked OK | reported=day | saved_as=day | rows=731
[OK]       n-transactions-per-block | saved_as=   day | rows=731
[INFO] median-confirmation-time: chunked OK | reported=day | saved_as=day | rows=731
[OK]       median-confirmation-time | saved_