In [None]:
%pip install yfinance

In [None]:
START_D = "2022-01-01"
END_D   = "2025-12-18"

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import yfinance as yf

pd.set_option("display.max_columns", 120)

In [None]:
from __future__ import annotations

from pathlib import Path
import hashlib
import re
from io import StringIO

import numpy as np
import pandas as pd

#CONFIGURATION
CACHE_DIR = Path(".cache")
CACHE_DIR.mkdir(exist_ok=True)

#CACHE UTILITIES (PARQUET -> csv fallback)

def _hash_key(*parts: object) -> str:
    h = hashlib.sha256()
    for p in parts:
        h.update(str(p).encode("utf-8"))
        h.update(b"|")
    return h.hexdigest()[:24]
def _cache_paths(prefix: str, key: str) -> tuple[Path, Path]:
    p_parquet = CACHE_DIR / f"{prefix}_{key}.parquet"
    p_csv = CACHE_DIR / f"{prefix}_{key}.csv"
    return p_parquet, p_csv

def _read_cache_df(prefix: str, key: str) -> pd.DataFrame | None:
    p_parquet, p_csv = _cache_paths(prefix, key)
    if p_parquet.exists():
        try:
            return pd.read_parquet(p_parquet)
        except Exception:
            pass
    if p_csv.exists():
        try:
            return pd.read_csv(p_csv)
        except Exception:
            pass
    return None

def _write_cache_df(df: pd.DataFrame, prefix: str, key: str) -> None:
    p_parquet, p_csv = _cache_paths(prefix, key)
    try:
        df.to_parquet(p_parquet, index=False)
        return
    except Exception:
        pass
    try:
        df.to_csv(p_csv, index=False)
    except Exception:
        # If caching fails, ignore silently (no exceptions)
        return

# HTTP helpers

def http_get_text(url: str, timeout: int = 30, headers: dict | None = None) -> str:
    """
    Returns response text. Raises inside, but callers wrap in try/except (no exceptions to user).
    """
    import requests

    h = {
        "User-Agent": "Mozilla/5.0 (compatible; FinanceCourse/1.0)",
        "Accept": "*/*",
    }
    if headers:
        h.update(headers)

    r = requests.get(url, timeout=timeout, headers=h)
    r.raise_for_status()
    return r.text
# Numeric + returns helpers
def parse_number(x) -> float:
    """
    Robust number parser:
      - '3,367' -> 3.367 (comma decimal)
      - '1,234.56' -> 1234.56 (comma thousands)
    """
    if x is None:
        return np.nan
    s = str(x).strip()
    if s == "" or s.lower() in {"nan", "none"}:
        return np.nan

    if "," in s and "." in s:
        s = s.replace(",", "")
    elif "," in s and "." not in s:
        s = s.replace(",", ".")

    try:
        return float(s)
    except Exception:
        return np.nan

def pct_change(s: pd.Series) -> pd.Series:
    x = pd.to_numeric(s, errors="coerce")
    return x.pct_change(fill_method=None).replace([np.inf, -np.inf], np.nan)
# BCRP parsing

_SP2EN = {
    "Ene": "Jan", "Feb": "Feb", "Mar": "Mar", "Abr": "Apr", "May": "May", "Jun": "Jun",
    "Jul": "Jul", "Ago": "Aug", "Set": "Sep", "Sep": "Sep", "Oct": "Oct", "Nov": "Nov", "Dic": "Dec",
}

def _clean_bcrp_payload(txt: str) -> str:
    """
    BCRP sometimes returns "CSV" wrapped as HTML with <br> line breaks.
    Normalize to plain text with real newlines.
    """
    x = txt.replace("\r\n", "\n").replace("\r", "\n")
    x = x.replace("<br/>", "\n").replace("<br />", "\n").replace("<br>", "\n")
    x = re.sub(r"</?pre[^>]*>", "", x, flags=re.IGNORECASE)
    return x.strip()

def _detect_sep(header_line: str) -> str:
    return ";" if header_line.count(";") > header_line.count(",") else ","

def _parse_bcrp_date(s: pd.Series) -> pd.Series:
    """
    Parses common BCRP date formats:
      - Daily: 2022-01-03, 03Jan22, 03Ene22
      - Monthly: Jan22, Ene22, 2022-1, 2022-01
      - Yearly: 2022
    """
    x = s.astype(str).str.strip()
    x = x.str.replace(".", "", regex=False)

    y = x
    for k, v in _SP2EN.items():
        y = y.str.replace(k, v, regex=False)

    dt = pd.to_datetime(y, format="%Y-%m-%d", errors="coerce")

    m = dt.isna()
    if m.any():
        dt.loc[m] = pd.to_datetime(y[m], format="%d%b%y", errors="coerce")

    m = dt.isna()
    if m.any():
        dt.loc[m] = pd.to_datetime(y[m], format="%b%y", errors="coerce")

    m = dt.isna()
    if m.any():
        dt.loc[m] = pd.to_datetime(y[m], format="%Y-%m", errors="coerce")

    m = dt.isna()
    if m.any():
        dt.loc[m] = pd.to_datetime(y[m], format="%Y", errors="coerce")

    m = dt.isna()
    if m.any():
        dt.loc[m] = pd.to_datetime(y[m], errors="coerce")

    return dt


def bcrp_series_csv(
    series_codes: list[str],
    start: str,
    end: str,
    lang: str = "ing",
) -> pd.DataFrame:
    """
    BCRPData API (CSV):
      https://estadisticas.bcrp.gob.pe/estadisticas/series/api/[codes]/csv/[start]/[end]/[lang]

    Returns LONG DataFrame:
      date, series_name, value

    If the endpoint fails, prints a short message and returns an empty DataFrame (no exceptions).
    """
    codes = "-".join(series_codes)
    url = f"https://estadisticas.bcrp.gob.pe/estadisticas/series/api/{codes}/csv/{start}/{end}/{lang}"

    key = _hash_key("bcrp", url)
    cached = _read_cache_df("bcrp", key)
    if cached is not None and cached.shape[0] > 0:
        cached["date"] = pd.to_datetime(cached["date"], errors="coerce")
        return cached

    try:
        txt = http_get_text(url, timeout=30)
        txt = _clean_bcrp_payload(txt)

        lines = [ln for ln in txt.split("\n") if ln.strip() != ""]
        if len(lines) < 2:
            print("[BCRP] Endpoint returned no usable rows. Continuing...")
            return pd.DataFrame(columns=["date", "series_name", "value"])

        sep = _detect_sep(lines[0])
        df = pd.read_csv(StringIO("\n".join(lines)), sep=sep)

        if df.shape[0] == 0 or df.shape[1] < 2:
            print("[BCRP] Returned an empty table. Continuing...")
            return pd.DataFrame(columns=["date", "series_name", "value"])

        date_col = df.columns[0]
        value_cols = list(df.columns[1:])

        out = df.melt(
            id_vars=[date_col],
            value_vars=value_cols,
            var_name="series_name",
            value_name="value_raw",
        ).rename(columns={date_col: "date"})

        out["date"] = _parse_bcrp_date(out["date"])
        out["value"] = out["value_raw"].map(parse_number)

        out = out.drop(columns=["value_raw"])
        out = out.dropna(subset=["date"]).sort_values(["series_name", "date"]).reset_index(drop=True)

        _write_cache_df(out, "bcrp", key)
        return out

    except Exception as e:
        print(f"[BCRP] Endpoint unavailable ({type(e).__name__}). Continuing...")
        return pd.DataFrame(columns=["date", "series_name", "value"])
# Yahoo Finance (yfinance)

def yfinance_download(tickers: list[str], start: str, end: str) -> pd.DataFrame:
    """
    Yahoo Finance via yfinance.
    Returns LONG DataFrame:
      date, ticker, close, volume, ret

    If the endpoint fails, prints a short message and returns an empty DataFrame (no exceptions).
    """
    try:
        import yfinance as yf
    except Exception:
        print("[yfinance] yfinance not installed/importable. Continuing...")
        return pd.DataFrame(columns=["date", "ticker", "close", "volume", "ret"])

    key = _hash_key("yfinance", " ".join(tickers), start, end)
    cached = _read_cache_df("yf", key)
    if cached is not None and cached.shape[0] > 0:
        cached["date"] = pd.to_datetime(cached["date"], errors="coerce")
        return cached

    try:
        data = yf.download(tickers=tickers, start=start, end=end, auto_adjust=False, progress=False)
        if data is None or data.shape[0] == 0:
            print("[yfinance] Returned no rows. Continuing...")
            return pd.DataFrame(columns=["date", "ticker", "close", "volume", "ret"])

        frames = []
        if isinstance(data.columns, pd.MultiIndex):
            for t in tickers:
                if t not in data.columns.get_level_values(1):
                    continue
                sub = data.xs(t, axis=1, level=1).copy()
                sub = sub.reset_index().rename(columns={"Date": "date", "Datetime": "date"})
                sub["ticker"] = t
                sub = sub.rename(columns={"Close": "close", "Volume": "volume"})
                frames.append(sub[["date", "ticker", "close", "volume"]])
            out = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
        else:
            out = data.reset_index().rename(columns={"Date": "date", "Datetime": "date"})
            out["ticker"] = tickers[0]
            out = out.rename(columns={"Close": "close", "Volume": "volume"})
            out = out[["date", "ticker", "close", "volume"]]

        out["date"] = pd.to_datetime(out["date"], errors="coerce")
        out["close"] = pd.to_numeric(out["close"], errors="coerce")
        out["volume"] = pd.to_numeric(out["volume"], errors="coerce")

        out = out.dropna(subset=["date", "close"]).sort_values(["ticker", "date"]).reset_index(drop=True)
        out["ret"] = out.groupby("ticker")["close"].apply(pct_change).reset_index(level=0, drop=True)

        _write_cache_df(out, "yf", key)
        return out

    except Exception as e:
        print(f"[yfinance] Endpoint unavailable ({type(e).__name__}). Continuing...")
        return pd.DataFrame(columns=["date", "ticker", "close", "volume", "ret"])

In [None]:
START_D = "2022-01-01"
END_D   = "2025-12-18"

peru = bcrp_series_csv(
    series_codes=["PD04637PD", "PD04639PD", "PD04704XD", "PD04701XD"],
    start=START_D,
    end=END_D,
    lang="ing"
)

usa = yfinance_download(["SPY", "TLT", "GLD"], start=START_D, end=END_D)

print("Peru rows:", peru.shape[0], "| USA rows:", usa.shape[0])

# show in English (wide + renamed)
peru_wide = (
    peru.pivot_table(index="date", columns="series_name", values="value", aggfunc="last")
        .reset_index()
        .sort_values("date")
)
peru_wide.columns.name = None

rename_map = {}
for c in peru_wide.columns:
    cl = str(c).lower()
    if "interbanc" in cl and ("tipo de cambio" in cl or "tc" in cl or "exchange" in cl):
        rename_map[c] = "fx_interbank_buy"
    elif "sbs" in cl and ("tipo de cambio" in cl or "tc" in cl or "exchange" in cl):
        rename_map[c] = "fx_sbs_buy"
    elif "oro" in cl or "gold" in cl:
        rename_map[c] = "gold_london"
    elif "cobre" in cl or "copper" in cl:
        rename_map[c] = "copper_london"

peru_wide = peru_wide.rename(columns=rename_map)
peru_wide.head()

In [None]:
usa["ticker"].value_counts()

In [None]:
#3. Distributions
# Exercise 3.1 - Return distributions: Peru Gold vs US Gold ETF
# El objetivo es comparar cómo se distribuyen los retornos diarios de oro de referencia del BCRP (Perú) y ETF GLD (USA)
#1. PARTIMOS DE peru_wide
#ORO PERÚ
peru_gold = (
    peru_wide[["date", "gold_london"]]
    .dropna()
    .sort_values("date")
)
#HALLAMOS LOS RETORNOS DIARIOS
#pct_change() nos dará el retorno porcentual diario
peru_gold["ret"] = peru_gold["gold_london"].pct_change()

peru_gold.head()

In [None]:
#2. From usa, filter to GLD returns
#FILTRAMOS LOS RETORNOS DE GLD USA
gld = (
    usa[usa["ticker"] == "GLD"]
    .copy()
    .dropna(subset=["ret"])
)

gld.head()

In [None]:
#3. Plot two histograms (separate figures) with the same binning.
#PRIMERO DEFINIMOS LOS MISMOS BINS PARA QUE LA COMPARACIÓN SEA JUSTA
all_rets = pd.concat([
    peru_gold["ret"].dropna(),
    gld["ret"].dropna()
])

bins = np.histogram_bin_edges(all_rets, bins=40)

In [None]:
#HISTOGRAMA 1 - ORO PERÚ
plt.figure(figsize=(7,5))
plt.hist(
    peru_gold["ret"].dropna(),
    bins=bins,
    edgecolor="black"
)
plt.title("Distribución de Retornos — Oro Perú (BCRP)")
plt.xlabel("Retorno diario")
plt.ylabel("Frecuencia")
plt.show()

In [None]:
usa_gld = yfinance_download(
    ["GLD"],
    start=START_D,
    end=END_D
)

usa_gld["ticker"].value_counts()

In [None]:
gld = (
    usa_fixed[usa_fixed["ticker"] == "GLD"]
    .dropna(subset=["ret"])
    .sort_values("date")
    .copy()
)

gld["ret"].describe()

In [None]:
#HISTOGRAMA 2 - GLD USA
plt.figure(figsize=(7,5))
plt.hist(
    gld["ret"].dropna(),
    bins=bins,
    edgecolor="black"
)
plt.title("Distribución de Retornos — GLD (USA)")
plt.xlabel("Retorno diario")
plt.ylabel("Frecuencia")
plt.show()

In [None]:
#4. Optional: overlay KDE for each distribution.
#KDE + HISTOGRAMA ORO PERÚ
plt.figure(figsize=(7,5))

sns.histplot(
    peru_gold["ret"].dropna(),
    bins=bins,
    stat="count",
    kde=True
)

plt.title("Distribución de Retornos — Oro Perú (BCRP)")
plt.xlabel("Retorno diario")
plt.ylabel("Frecuencia")
plt.show()

In [None]:
#KDE + HISTOGRAMA GLD USA
plt.figure(figsize=(7,5))

sns.histplot(
    gld["ret"].dropna(),
    bins=bins,
    stat="count",
    kde=True
)

plt.title("Distribución de Retornos — GLD (USA)")
plt.xlabel("Retorno diario")
plt.ylabel("Frecuencia")
plt.show()

In [None]:
#Exercise 3.2 — Annotations (mark a key event on a series)
#ENCONTRAR EL DÍA CON EL MAYOR RETORNO ABSOLUTO
#FILTRAMOS SPY DESDE usa
spy = usa[usa["ticker"] == "SPY"].copy()
spy[["date", "ret"]].dropna().head()

In [None]:
#HALLAMOS LA FECHA Y EL RETORNO 
idx_extreme = spy["ret"].abs().idxmax()

extreme_row = spy.loc[idx_extreme]

extreme_date = extreme_row["date"]
extreme_ret  = extreme_row["ret"]

extreme_date, extreme_ret

In [None]:
plt.figure(figsize=(10,5))
plt.plot(spy["date"], spy["ret"])
plt.title("Retornos diarios del SPY")
plt.xlabel("Fecha")
plt.ylabel("Retorno diario")

In [None]:
#NOS ASEGURAMOS DE TENER EL SPY LIMPIO
spy_clean = (
    usa[usa["ticker"] == "SPY"]
    .dropna(subset=["ret"])
    .copy()
)

In [None]:
#ENCONTRAMOS EL PUNTO EXTREMO
idx = spy_clean["ret"].abs().idxmax()
extreme_date = spy_clean.loc[idx, "date"]
extreme_ret  = spy_clean.loc[idx, "ret"]

In [None]:
#GRÁFICO Y ANOTACIÓN
plt.plot(spy_clean["date"], spy_clean["ret"], label="SPY retornos")
plt.annotate(
    f"Máx |ret|\n{extreme_date.date()}\n{extreme_ret:.2%}",
    xy=(extreme_date, extreme_ret),
    xytext=(extreme_date, extreme_ret * 1.5),
    arrowprops=dict(arrowstyle="->")
)

plt.title("SPY — Retornos diarios con evento extremo")
plt.xlabel("Fecha")
plt.ylabel("Retorno")
plt.legend()
plt.show()

In [None]:
#4.2 Exercise 4.2
#RUN A SIMPLE REGRESSION OF GLD RETURNS ON SPY RETURNS
usa_gld = yfinance_download(
    ["GLD"],
    start=START_D,
    end=END_D
)

usa_gld["ticker"].value_counts()

In [None]:
usa_fixed = pd.concat([usa, usa_gld], ignore_index=True)
usa_fixed["ticker"].value_counts()

In [None]:
#PRIMERO CONSTRUIMOS LOS RETORNOS ALINEADOS
#FILTRAMOS SPY Y GLD
spy = (
    usa_fixed[usa_fixed["ticker"] == "SPY"]
    [["date", "ret"]]
    .rename(columns={"ret": "ret_SPY"})
)

gld = (
    usa_fixed[usa_fixed["ticker"] == "GLD"]
    [["date", "ret"]]
    .rename(columns={"ret": "ret_GLD"})
)

In [None]:
#ALINEAMOS POR FECHA
ret_aligned = (
    pd.merge(spy, gld, on="date", how="inner")
    .dropna()
)

ret_aligned.head()
ret_aligned.shape

In [None]:
#CALCULAMOS ALPHA Y BETA
x = ret_aligned["ret_SPY"]
y = ret_aligned["ret_GLD"]

beta = np.cov(x, y, ddof=1)[0, 1] / np.var(x, ddof=1)
alpha = y.mean() - beta * x.mean()

alpha, beta

In [None]:
#SCATTER + RECTA ESTIMADA
plt.figure(figsize=(7,5))
plt.scatter(x, y, alpha=0.5, label="Datos")

x_line = np.linspace(x.min(), x.max(), 100)
y_line = alpha + beta * x_line

plt.plot(x_line, y_line, color="red", label="Recta OLS")

plt.title("GLD vs SPY — Retornos diarios")
plt.xlabel("SPY return")
plt.ylabel("GLD return")
plt.legend()
plt.show()

In [None]:
#INTERPRETACIÓN
#El coeficiente β mide la sensibilidad de los retornos de GLD frente a los retornos del mercado accionario estadounidense (SPY). 
#Un valor positivo de β indica que, en promedio, cuando SPY sube, GLD también tiende a subir. Sin embargo, dado que el valor de 
#β es menor que 1, la reacción de GLD es más moderada que la del mercado accionario.