In [None]:
import numpy as np
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt

class RNG:
    def __init__(self, seed=None):
        self.rng = np.random.default_rng()

    def normal(self, size, seed = 1):
        rng = np.random.default_rng(seed)
        return rng.standard_normal(size)
    

class Payoff:
    def __init__(self, strike, option_type='call'):
        self.strike = strike
        self.option_type = option_type

    def __call__(self, prices):
        if self.option_type == 'call':
            return np.maximum(prices - self.strike, 0)
        if self.option_type == 'put':
            return np.maximum(self.strike - prices, 0)
        if self.option_type == 'digital':
            return (prices > self.strike).astype(float)
        raise Exception("XXX")
        



class Market:
    def __init__(self, spot, r_t, sigma_t, times):
        self.spot = spot
        self.r_t = r_t
        self.sigma_t = sigma_t
        self.times = times

    




class PathGenerator:
    def __init__(self, market):
        self.spot = market.spot
        self.r_t = market.r_t
        self.sigma_t = market.sigma_t
        self.times = np.array(market.times)


# Monte carlo simulation of a path
    def generate_paths(self, N, random_matrix):
        M = len(self.times)
        paths = np.zeros((N, M))
        paths[:, 0] = self.spot
        for i in range(1, M):
            dt = self.times[i] - self.times[i - 1]
            r = self.r_t(self.times[i - 1])
            sigma = self.sigma_t(self.times[i - 1])
            paths[:, i] = paths[:, i - 1] * np.exp((r - 0.5 * sigma ** 2) * dt + sigma * np.sqrt(dt) * random_matrix[:, i - 1])
        return paths
    

def second_basis_function(x):
    return np.column_stack([np.ones_like(x), x, x**2, x**3])

def longstaff_regression(X, Y):
    Phi_x = second_basis_function(X)
    model = LinearRegression(fit_intercept = False)
    model.fit(Phi_x, Y)
    beta = model.coef_

    def phi(x_new):
        x_new = np.atleast_1d(x_new)
        return second_basis_function(x_new) @ beta
    
    return phi, beta






N = 10000
times = [0.0, 0.8, 1.0]
K = 100
delta = 1e-3
spot = 100
market = Market(spot, lambda t: 0.05, lambda t: 0.2, times)
market_u = Market(spot * (1 + delta), lambda t: 0.05, lambda t: 0.2, times)
market_l = Market(spot * (1 - delta), lambda t: 0.05, lambda t: 0.2, times)

seed_first = 1
# Generate Phi using first seed
## Setup
rng = RNG()
random_matrix = rng.normal(size=(N, len(times) - 1), seed = seed_first)
payoff = Payoff(K, option_type='digital')
generator = PathGenerator(market)

## Simulate
paths = generator.generate_paths(N, random_matrix)
S_T_tau = paths[:, 1]  
# S_{T - τ}

F_S_T = payoff(paths[:, 2])  
# F(S_T)

phi, beta = longstaff_regression(S_T_tau, F_S_T)

# Generate a list of Y_pred using rest seed
Y_pred_list = []
Y_pred_list_2 = []
MC_price = []
MC_price_2 = []
seed_list = np.random.default_rng(seed=33).choice(np.arange(100, 10**9), size=1000, replace=False)



for i in range(0, len(seed_list)):
    seed_forloop = seed_list[i]
    random_matrix_forloop = rng.normal(size = (N, len(times) - 1), seed = seed_forloop)
    payoff = Payoff(K, option_type='digital')
    generator = PathGenerator(market)
    #Simulate
    paths_forloop = generator.generate_paths(N, random_matrix_forloop)
    # S_(T - τ)
    S_T_tau_forloop = paths_forloop[:, 1]

    Phi_S_T_tau_forloop = phi(S_T_tau_forloop)
    #E(X^2) term
    Y_pred_list_2.append((np.mean(phi(S_T_tau_forloop)))**2)

    Y_pred_list.append(np.mean(Phi_S_T_tau_forloop))
    # F(S_T)
    F_S_T_forloop = payoff(paths_forloop[:, 2])
    MC_price_2.append(np.mean(F_S_T_forloop)**2)
    MC_price.append(np.mean(F_S_T_forloop))

# Calculate variance of phi
var_phi = 1/len(seed_list) * sum(Y_pred_list_2) - (1/len(seed_list) * sum(Y_pred_list))**2
print('Variance of phi:', var_phi)
# Calculate the variance of MC
var_MC = 1/len(seed_list) * sum(MC_price_2) - (1/len(seed_list) * sum(MC_price))**2
print('Variance of MC:', var_MC)

# Calculate the mean of Y_pred_list and MC_price
print("Price of regression:", np.mean(Y_pred_list), "Price of MC:", np.mean(MC_price))

print("Var of single Y", np.var(F_S_T))



## Compute \xi 
xi_mc = []
xi_phi = []

random_matrix_forloop = rng.normal(size = (N, len(times) - 1), seed = seed_list[0])
payoff = Payoff(K, option_type='digital')
generator_u = PathGenerator(market_u)
#Simulate
paths_forloop_u = generator_u.generate_paths(N, random_matrix_forloop)

generator_l = PathGenerator(market_l)
paths_forloop_l = generator_l.generate_paths(N, random_matrix_forloop)
# store xi (mc)
xi_mc.extend(((payoff(paths_forloop_u[:, 2]) - payoff(paths_forloop_l[:, 2])) / (2 * delta * spot)).tolist())
xi_mc_squared = list(map(lambda x: x**2, xi_mc))

# store xi (phi)
xi_phi.extend(((phi(paths_forloop_u[:, 1]) - phi(paths_forloop_l[:, 1])) / (2 * delta * spot)).tolist())
xi_phi_squared = list(map(lambda x: x**2, xi_phi))

# Calculate the mean
print("Mean of xi (MC):", np.mean(xi_mc), "Mean of xi (phi):", np.mean(xi_phi))
# Calculate the variance
print("Variance of xi (MC):", 1/N * sum(xi_mc_squared) - (1/N * sum(xi_mc))**2, "Variance of xi (phi):", 1/N * sum(xi_phi_squared) - (1/N * sum(xi_phi))**2)

    



Variance of phi: 1.373632312684503e-05
Variance of MC: 2.4939019190373113e-05
Price of regression: 0.5609448159432409 Price of MC: 0.5598259000000001
Var of single Y 0.24681903999999996
Mean of xi (MC): 0.02 Mean of xi (phi): 0.019881642788173823
Variance of xi (MC): 0.09960000000000001 Variance of xi (phi): 5.311718979506588e-05


In [None]:

## Setup
# rng = RNG()
# random_matrix = rng.normal(size=(N, len(times) - 1), seed = 34)
# random_matrix_new = rng.normal(size = (N, len(times) - 1), seed = 3234)
# payoff = Payoff(K, option_type='digital')
# generator = PathGenerator(market)

# ## Simulate
# paths = generator.generate_paths(N, random_matrix)
# paths_new = generator.generate_paths(N, random_matrix_new)
# X = paths[:, 1]  
# X_new = paths_new[:, 1]
# # S_{T - τ}

# Y = payoff(paths[:, 2])  
# # F(S_T)

# phi, beta = longstaff_regression(X, Y)

# #用回归得到的 phi(S_{T−τ}) 拟合 Y

# Y_predicted = phi(X_new)


# # 对比 variance
# var_MC = np.var(Y)
# var_LS = np.var(Y_predicted)

# print(var_MC, var_LS)


# plt.scatter(X, Y, alpha=0.2, label='True F(S_T)', s=10)
# x_vals = np.linspace(min(X), max(X), 200)
# plt.plot(x_vals, phi(x_vals), color='red', label='Regression φ(x)')
# plt.xlabel('$S_{T−\\tau}$')
# plt.ylabel('Payoff')
# plt.title('Digital Payoff Regression')

# plt.legend()
# plt.grid(True)
# plt.show()