<a href="https://colab.research.google.com/github/Jaksta1/Monte-Carlo-method-for-option-prices/blob/main/mgr_skrypt_v02.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!/usr/bin/env python3
"""
Finalny skrypt: wycena opcji amerykańskiej (Ehrlichman-Henderson style)
- QMC (qmcpy Sobol/Halton)
- Extended MARS z LDA i GCV
- LSM + martingale control variate
- Poprawne dyskontowanie (używa dt)
"""
import numpy as np
from scipy.stats import norm
from qmcpy import Sobol, Halton
from typing import Callable, Tuple, Optional
from typing import Callable, Tuple, Optional, List, Dict
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
import warnings
warnings.filterwarnings("ignore")


# ---------------------------
# Pomocnicze funkcje
# ---------------------------

def _next_pow2(x: int) -> int:
    if x <= 0:
        return 1
    return 1 << int(np.ceil(np.log2(x)))


# ---------------------------
# Symulacja ścieżek QMC (z prefetch i dopełnieniem)
# ---------------------------

def simulate_paths(
    S0: np.ndarray,
    r: float,
    sigma: np.ndarray,
    rho: np.ndarray,
    n_steps: int,
    T: float,
    N: int,
    qmc_sampler,
    seed: Optional[int] = None
) -> np.ndarray:
    """
    Generuje N ścieżek d-wymiarowego Black-Scholesa z korelacjami.
    """
    d = len(S0)
    dt = T / n_steps
    paths = np.zeros((N, n_steps + 1, d))
    paths[:, 0, :] = S0

    # Cholesky korelacji
    L = np.linalg.cholesky(rho)

    # Generuj próbki dla wszystkich kroków czasowych naraz
    total_dim = n_steps * d
    # Generujemy N próbek o wymiarze (n_steps * d)
    u = qmc_sampler.gen_samples(n_min=0, n_max=N)  # (N, total_dim)
    if u.shape[1] != total_dim:
        # Jeśli wymiar próbek jest nieprawidłowy, generujemy jeszcze raz z właściwym wymiarem
        qmc_sampler = Sobol(dimension=total_dim, randomize='Owen', seed=seed)
        u = qmc_sampler.gen_samples(n_min=0, n_max=N)

    # Zapobiegaj skrajnym wartościom
    eps = 1e-12
    u = np.clip(u, eps, 1 - eps)

    # Przekształć na normalne
    z = norm.ppf(u)

    # Przekształć na właściwy kształt (N, n_steps, d)
    z = z.reshape(N, n_steps, d)

    # Aplikuj korelację
    for t in range(n_steps):
        z[:, t, :] = z[:, t, :] @ L.T

    # Symuluj ścieżki
    drift = (r - 0.5 * sigma**2) * dt
    diffusion = sigma * np.sqrt(dt)

    for t in range(n_steps):
        paths[:, t+1] = paths[:, t] * np.exp(
            drift + diffusion * z[:, t]
        )

    return paths


# ---------------------------
# Extended MARS z LDA + GCV
# ---------------------------

class ExtendedMARS:
    """
    Extended MARS (Multivariate Adaptive Regression Splines) z LDA i GCV
    """
    def __init__(self, max_terms: int = 20, min_obs_per_knot: int = 20, penalty: float = 3.0):
        self.max_terms = max_terms
        self.min_obs_per_knot = min_obs_per_knot
        self.base_penalty = penalty

        # Parametry modelu
        self.a0: float = 0.0
        self.alphas: List[float] = []
        self.directions: List[np.ndarray] = []
        self.knots: List[float] = []
        self.signs: List[int] = []

        # Diagnostyka
        self.gcv_scores: List[float] = []
        self.r2_scores: List[float] = []

    def _get_lda_directions(self, X: np.ndarray, y: np.ndarray, n_splits: int = 5) -> List[np.ndarray]:
        """Generuje kierunki LDA używając wielu punktów podziału danych"""
        directions = []
        splits = np.linspace(0, 100, n_splits+2)[1:-1]

        for percentile in splits:
            try:
                threshold = np.percentile(y, percentile)
                labels = (y > threshold).astype(int)

                if len(np.unique(labels)) > 1:
                    lda = LinearDiscriminantAnalysis(n_components=min(X.shape[1], 1))
                    lda.fit(X, labels)

                    v = lda.scalings_[:, 0].copy()
                    normv = np.linalg.norm(v)
                    if normv > 0:
                        v /= normv
                        if not any(np.allclose(v, d) for d in directions):
                            directions.append(v)
            except Exception as e:
                warnings.warn(f"LDA nie powiodło się dla percentyla {percentile}: {str(e)}")
                continue

        return directions

    def _gcv(self, y: np.ndarray, yhat: np.ndarray, num_params: int,
             adaptive_penalty: bool = True) -> float:
        """Oblicza score GCV z adaptacyjną karą"""
        n = len(y)
        rss = np.sum((y - yhat)**2)

        if adaptive_penalty:
            penalty = self.base_penalty + 0.1 * np.log(num_params + 1)
        else:
            penalty = self.base_penalty

        p = max(1, num_params) + 1
        denom = (1 - penalty * p / n)**2

        if denom <= 0:
            return np.inf

        gcv_score = (rss / n) / denom
        complexity_penalty = np.log(p) * rss / n

        return gcv_score + complexity_penalty

    def _generate_knots(self, proj: np.ndarray) -> np.ndarray:
        """Generuje kandydackie węzły"""
        percentiles = [5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 95]
        knots = np.percentile(proj, percentiles)

        valid_knots = []
        for k in knots:
            if (proj <= k).sum() >= self.min_obs_per_knot and \
               (proj > k).sum() >= self.min_obs_per_knot:
                valid_knots.append(k)

        return np.array(valid_knots)

    def _check_linear_independence(self, new_term: np.ndarray,
                                 current_terms: List[np.ndarray]) -> bool:
        """Sprawdza liniową niezależność"""
        if not current_terms:
            return True

        for term in current_terms:
            corr = np.corrcoef(new_term, term)[0, 1]
            if abs(corr) > 0.99:
                return False
        return True

    def fit(self, X: np.ndarray, y: np.ndarray) -> 'ExtendedMARS':
        """Dopasowuje model MARS do danych"""
        n, d = X.shape
        X = np.asarray(X)
        y = np.asarray(y)

        self.a0 = float(np.mean(y))
        residual = y - self.a0

        directions = [np.eye(d)[i] for i in range(d)]
        lda_directions = self._get_lda_directions(X, y)
        directions.extend(lda_directions)

        directions = [d for i, d in enumerate(directions)
                     if not any(np.allclose(d, p) for p in directions[:i])]

        terms = [np.ones(n)]
        preds = [np.full_like(y, self.a0)]
        self.gcv_scores = [self._gcv(y, preds[0], 1)]

        for term_idx in range(self.max_terms):
            best_gcv = np.inf
            best_config = None

            for a in directions:
                proj = X @ a
                knots = self._generate_knots(proj)

                for k in knots:
                    for sign in [1, -1]:
                        h = np.maximum(sign * (proj - k), 0.0)

                        if self._check_linear_independence(h, terms[1:]):
                            temp_terms = terms + [h]
                            design = np.column_stack(temp_terms)

                            try:
                                coef = np.linalg.lstsq(design, y, rcond=None)[0]
                                pred = design @ coef
                                gcv = self._gcv(y, pred, len(temp_terms), True)

                                if gcv < best_gcv:
                                    best_gcv = gcv
                                    best_config = (a, k, sign, h, coef)
                            except np.linalg.LinAlgError:
                                continue

            if best_config is None or best_gcv >= self.gcv_scores[-1]:
                break

            a, k, sign, h, coef = best_config
            self.directions.append(a)
            self.knots.append(k)
            self.signs.append(sign)
            terms.append(h)

            self.a0 = float(coef[0])
            self.alphas = list(coef[1:])

            preds.append(terms[-1])
            self.gcv_scores.append(best_gcv)

            y_pred = self.predict(X)
            r2 = 1 - np.sum((y - y_pred)**2) / np.sum((y - np.mean(y))**2)
            self.r2_scores.append(r2)

        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        """Przewiduje wartości dla nowych danych"""
        X = np.asarray(X)
        n = X.shape[0]
        pred = np.full(n, self.a0)

        for i, (direction, knot, sign, alpha) in enumerate(zip(
            self.directions, self.knots, self.signs, self.alphas)):
            proj = X @ direction
            term = np.maximum(sign * (proj - knot), 0.0)
            pred += alpha * term

        return pred

    def conditional_expectation(self, X_prev: np.ndarray, r: float,
                              sigma_vec: np.ndarray, rho_corr: np.ndarray,
                              dt: float, n_mc: int = 100) -> np.ndarray:
        """Oblicza warunkową wartość oczekiwaną"""
        n_paths = X_prev.shape[0]
        result = np.zeros(n_paths)

        drift = (r - 0.5 * sigma_vec**2) * dt
        diffusion = sigma_vec * np.sqrt(dt)

        for _ in range(n_mc):
            Z = np.random.standard_normal((n_paths, len(sigma_vec)))
            if len(sigma_vec) > 1:
                Z = Z @ np.linalg.cholesky(rho_corr).T

            X_next = X_prev * np.exp(drift + diffusion * Z)
            pred = self.predict(X_next)
            pred = np.clip(pred, -1e6, 1e6)  # Zabezpieczenie przed ekstremalnymi wartościami
            result += pred

        return result / n_mc


# ---------------------------
# Główna funkcja wyceny
# ---------------------------

def price_american_option(
    payoff_fn: Callable[[np.ndarray], float],
    d: int,
    n_steps: int,
    r: float,
    sigma: float,
    S0: float,
    K: float,
    N1: int = 2048,
    N2: int = 4096,
    qmc_method: str = 'sobol_scrambled',
    seed: Optional[int] = None,
    max_mars_terms: int = 15,
    rho_corr: Optional[np.ndarray] = None,
    T: float = 1.0  # Czas do wygaśnięcia w latach
) -> Tuple[float, float, float]:
    """
    Zwraca (lower_bound, upper_bound, point_estimate)

    Parameters:
    -----------
    n_steps : int
        Liczba kroków czasowych
    T : float
        Czas do wygaśnięcia w latach (domyślnie 1.0)
    """
    dt = T / n_steps

    if rho_corr is None:
        rho_corr = np.eye(d)
    else:
        assert rho_corr.shape == (d, d)

    S0_vec = np.full(d, S0)
    sigma_vec = np.full(d, sigma)

    # Inicjalizacja QMC samplera z właściwym wymiarem
    total_dim = n_steps * d
    if qmc_method == 'halton':
        qmc1 = Halton(dimension=total_dim, seed=seed)
    elif qmc_method == 'sobol':
        qmc1 = Sobol(dimension=total_dim, seed=seed)
    elif qmc_method == 'sobol_scrambled':
        qmc1 = Sobol(dimension=total_dim, randomize='Owen', seed=seed)
    else:
        raise ValueError("Nieznana metoda QMC")

    # Faza I: Dopasowanie strategii i MARS
    paths1 = simulate_paths(S0_vec, r, sigma_vec, rho_corr, n_steps, T, N1, qmc1, seed)
    Y = np.array([payoff_fn(paths1[n, -1, :]) for n in range(N1)])
    cv = np.zeros(N1)

    # Inicjalizacja modeli
    mars_models = [None] * (n_steps + 1)
    mars_models[-1] = ExtendedMARS(max_terms=1)
    mars_models[-1].a0 = float(np.mean(Y))
    mars_models[-1].alphas = []
    mars_models[-1].directions = []
    mars_models[-1].knots = []

    # Regresje LSM
    alpha_ls = [None] * n_steps

    # Backward induction
    for t in range(n_steps - 1, -1, -1):
        X_next = paths1[:, t + 1, :]
        mars = ExtendedMARS(max_terms=max_mars_terms)
        mars.fit(X_next, Y)
        mars_models[t + 1] = mars

        # Oblicz martyngał kontrolny
        X_curr = paths1[:, t, :]
        exp_mars = mars.conditional_expectation(X_curr, r, sigma_vec, rho_corr, dt)
        cv += mars.predict(X_next) - exp_mars

        # Znajdź opcje ITM
        exercise_value = np.array([payoff_fn(paths1[n, t, :]) for n in range(N1)])
        itm = exercise_value > 0

        if not np.any(itm):
            alpha_ls[t] = np.zeros(1 + 2 * d)
            Y *= np.exp(-r * dt)
            cv *= np.exp(-r * dt)
            continue

        # Oblicz wartość kontynuacji dla ITM
        X_itm = paths1[itm, t, :]
        phi = np.hstack([np.ones((X_itm.shape[0], 1)), X_itm, X_itm ** 2])
        Y_reg = Y[itm] - cv[itm]  # Użyj martyngału jako zmiennej kontrolnej

        try:
            alpha_t, *_ = np.linalg.lstsq(phi, Y_reg, rcond=None)
            alpha_t = alpha_t.ravel()
        except np.linalg.LinAlgError:
            alpha_t = np.zeros(phi.shape[1])

        alpha_ls[t] = alpha_t

        # Aktualizacja wartości
        for n in range(N1):
            if itm[n]:
                cont = float(alpha_t @ np.hstack([1.0, paths1[n, t, :], paths1[n, t, :] ** 2]))
                if exercise_value[n] > cont:
                    Y[n] = exercise_value[n]
                    cv[n] = 0.0
                else:
                    Y[n] *= np.exp(-r * dt)
                    cv[n] *= np.exp(-r * dt)
            else:
                Y[n] *= np.exp(-r * dt)
                cv[n] *= np.exp(-r * dt)

    # Faza II: Estymacja granic
    # Inicjalizacja QMC samplera
    if qmc_method == 'halton':
        qmc2 = Halton(dimension=total_dim, seed=(seed + 1) if seed is not None else None)
    elif qmc_method == 'sobol':
        qmc2 = Sobol(dimension=total_dim, seed=(seed + 1) if seed is not None else None)
    elif qmc_method == 'sobol_scrambled':
        qmc2 = Sobol(dimension=total_dim, randomize='Owen', seed=(seed + 1) if seed is not None else None)
    else:
        raise ValueError("Nieznana metoda QMC")
    paths2 = simulate_paths(S0_vec, r, sigma_vec, rho_corr, n_steps, T, N2, qmc2, seed)

    # Oblicz martyngał kontrolny dla drugiej fazy
    pi = np.zeros((N2, n_steps + 1))
    for t in range(1, n_steps + 1):
        X_t = paths2[:, t, :]
        X_prev = paths2[:, t - 1, :]

        h_t = mars_models[t].predict(X_t)
        exp_h = mars_models[t].conditional_expectation(X_prev, r, sigma_vec, rho_corr, dt)

        # Bardziej stabilne obliczanie przyrostu martyngału
        increment = np.exp(-r * (t-1) * dt) * (h_t - exp_h)
        increment = np.clip(increment, -1e6, 1e6)  # Zabezpieczenie przed ekstremalnymi wartościami
        pi[:, t] = pi[:, t-1] + increment

    # Znajdź optymalne czasy wykonania
    tau = np.full(N2, n_steps)
    for n in range(N2):
        for t in range(n_steps):
            exercise_value = payoff_fn(paths2[n, t, :])
            if exercise_value > 0:
                alpha_t = alpha_ls[t]
                if alpha_t is not None:
                    cont = float(alpha_t @ np.hstack([1.0, paths2[n, t, :], paths2[n, t, :] ** 2]))
                    if exercise_value > cont:
                        tau[n] = t
                        break

    # Oblicz dolne ograniczenie z martyngałem kontrolnym
    disc_payoff = np.exp(-r * tau * dt) * np.array([
        payoff_fn(paths2[n, tau[n], :]) for n in range(N2)
    ])
    lower = np.mean(disc_payoff - pi[np.arange(N2), tau])

    # Oblicz górne ograniczenie używając martyngału
    upper_vals = []
    for n in range(N2):
        vals = [
            np.exp(-r * t * dt) * payoff_fn(paths2[n, t, :]) - pi[n, t]
            for t in range(n_steps + 1)
        ]
        upper_vals.append(max(vals))
    upper = np.mean(upper_vals)

    point = 0.5 * (lower + upper)

    return lower, upper, point


# ---------------------------
# Przykład użycia (test)
# ---------------------------
import os
if __name__ == "__main__":
    # Parametry testowe
    d = 1
    r = 0.05        # 5% stopa procentowa
    sigma = 0.2     # 20% zmienność
    S0 = 100.0      # Cena początkowa
    K = 100.0       # Cena wykonania
    T = 1.0         # Czas do wygaśnięcia (1 rok)
    rho = np.eye(d)

    # Funkcja wypłaty dla opcji put
    payoff = lambda x: max(K - x[0], 0.0)

    # Przygotuj nazwę pliku z aktualną datą i czasem
    from datetime import datetime
    timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
    filename = f"american_option_results_{timestamp}.txt"

    # Zapisz parametry do pliku
    with open(filename, 'w') as f:
        f.write(f"Data i czas (UTC): {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write(f"Użytkownik: {os.getenv('USERNAME')}\n")
        f.write("\nParametry symulacji:\n")
        f.write(f"S0 = {S0}\n")
        f.write(f"K = {K}\n")
        f.write(f"r = {r}\n")
        f.write(f"sigma = {sigma}\n")
        f.write(f"T = {T}\n")
        f.write("Typ opcji: PUT\n")
        f.write("\nWyniki:\n")
        f.write("-" * 50 + "\n")

    # Test dla różnych liczb kroków czasowych
    n_steps_list = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1000, 1500, 2000, 3000, 5000]

    for n_steps in n_steps_list:
        print(f"\nLiczba kroków czasowych: {n_steps}")

        # Adaptacyjna liczba symulacji (zawsze potęga 2)
        N1_base = 8192
        N2_base = 4096
        # N1 = next_power_of_2(N1_base * (1 + n_steps // 32))
        # N2 = next_power_of_2(N2_base * (1 + n_steps // 32))

        # Wycena opcji
        lb, ub, pe = price_american_option(
            payoff_fn=payoff,
            d=d,
            n_steps=n_steps,
            T=T,
            r=r,
            sigma=sigma,
            S0=S0,
            K=K,
            N1=N1_base,
            N2=N2_base,
            qmc_method='sobol_scrambled',
            seed=42,
            max_mars_terms=20,
            rho_corr=rho
        )

        # Wyświetl wyniki
        print(f"Dolne ograniczenie: {lb:.4f}")
        print(f"Górne ograniczenie: {ub:.4f}")
        print(f"Szacowana cena:     {pe:.4f}")
        print(f"Szerokość przedziału: {ub-lb:.4f}")

        # Zapisz wyniki do pliku
        with open(filename, 'a') as f:
            f.write(f"\nLiczba kroków czasowych: {n_steps}\n")
            f.write(f"Liczba symulacji N1: {N1_base}\n")
            f.write(f"Liczba symulacji N2: {N2_base}\n")
            f.write(f"Dolne ograniczenie: {lb:.4f}\n")
            f.write(f"Górne ograniczenie: {ub:.4f}\n")
            f.write(f"Szacowana cena:     {pe:.4f}\n")
            f.write(f"Szerokość przedziału: {ub-lb:.4f}\n")
            f.write("-" * 30 + "\n")

    # Dodaj podsumowanie na końcu pliku
    with open(filename, 'a') as f:
        f.write("\nKoniec symulacji\n")
        f.write(f"Całkowity czas wykonania: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}\n")