In [11]:
%pip install numpy scipy joblib


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [12]:
from scipy.stats import norm
import numpy as np
import time
from scipy import stats
from joblib import Parallel, delayed
import pandas as pd

In [13]:
def get_parameter_values(S0, K, T, r, q, sigma): 
    # d1 and d2 
    d1 = (np.log(S0/K) + (r - q + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
       
    # Lambda value 
    lambda_ = (r - q + 0.5 * sigma**2) / sigma**2
    
    # Vanilla call value
    c = S0 * np.exp(-q * T) * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2) 
    
    return lambda_, c

 
def down_and_call_book(S0, K, T, r, q, sigma,H, H_down, H_up):
    
    # Values for the different functions 
    lambda_, c = get_parameter_values(S0, K, T, r, q, sigma)
    
    # Value for the down and out
    y = np.log(H_down**2 / (S0*K)) / (sigma * np.sqrt(T)) + lambda_ * sigma * np.sqrt(T)
    
    # Values for the down and in 
    x1 = np.log(S0/H_up) / (sigma * np.sqrt(T)) + lambda_ * sigma * np.sqrt(T)
    y1 = np.log(H_up/S0) / (sigma * np.sqrt(T)) + lambda_ * sigma * np.sqrt(T)

    # Calculate option values for call
    if H <= K:
        #Down and out
        cdi = S0 * np.exp(-q * T) * (H_down/S0)**(2*lambda_) * norm.cdf(y) - K * np.exp(-r * T) * (H_down/S0)**(2*lambda_ - 2) * norm.cdf(y - sigma * np.sqrt(T))
        cdo = c  - cdi
        return cdo
    else:
        #Down and in
        cdo = S0 * np.exp(-q * T) * norm.cdf(x1) - K * np.exp(-r * T) * norm.cdf(x1 - sigma * np.sqrt(T)) 
        - S0 * np.exp(-q * T) * (H_up/S0)**(2*lambda_) * norm.cdf(y1) + K * np.exp(-r * T) * (H_up/S0)**(2*lambda_ - 2) * norm.cdf(y1 - sigma * np.sqrt(T))
        cdi = c - cdo
        
        return cdi

In [14]:
def simulate_path_segment(m, r, T, sigma, S0, K, H, q, segment_n_paths, dt):
    dW = np.sqrt(dt) * np.random.randn(m, segment_n_paths)
    S = np.zeros((m + 1, segment_n_paths))
    S[0, :] = S0
    for i in range(1, m + 1):
        S[i, :] = S[i - 1, :] * np.exp((r - q - 0.5 * sigma ** 2) * dt + sigma * dW[i - 1, :])
        S[i, :] = np.where(S[i, :] < H, 0, S[i, :])  # Apply barrier condition

    payoffs = np.maximum(S[-1, :] - K, 0) * np.exp(-r * T)
    payoffs = np.where(S.any(axis=0), payoffs, 0)
    return payoffs


def price_down_and_out_call_brown(m, r, T, sigma, S0, K, H, q, n=10**7, n_jobs=8):
    n_paths = n
    n_steps = m
    dt = T / n_steps

    # Adjust the number of jobs and paths per job
    confidence_level=0.95
    paths_per_job = max(1, round(n_paths // n_jobs))  # Ensure at least one path per job

    # Adjust n_jobs if n_paths is smaller
    n_jobs = min(n_jobs, n_paths)

    # Running simulations in parallel
    results = Parallel(n_jobs=n_jobs)(delayed(simulate_path_segment)(
        m, r, T, sigma, S0, K, H, q, paths_per_job, dt) for _ in range(n_jobs))

    # Check if results are valid before concatenation
    if not results or any(len(res) == 0 for res in results):
        print("Error: No valid data returned from parallel tasks.")
        return 0, 0, 0

    # Concatenate results and calculate final values
    all_payoffs = np.concatenate(results)
    option_price = np.mean(all_payoffs)
    std_error = np.std(all_payoffs)
    sem = std_error / np.sqrt(n_paths)

    z_score = stats.norm.ppf(1 - (1 - confidence_level) / 2)
    confidence_interval = z_score * sem

    return option_price, sem, confidence_interval

In [15]:
# Function to adjust the barrier for discrete monitoring
def adjusted_barrier(T, H, sigma,m, beta):
    # dT should be here, it "is the time between monitoring instants", p.325, also stated in book from michael at p.628
    delta_T = T / m

    # H_adj = H * np.exp( -1* beta * sigma * np.sqrt(T))
    H_adj_down = H * np.exp( -1* beta * sigma * np.sqrt(delta_T))
    H_adj_up = H * np.exp( beta * sigma * np.sqrt(delta_T))
    
    return H_adj_down, H_adj_up

In [16]:

# Function to calculate percentage error
def percentage_error(price_adj, exact_price):
    return abs((price_adj - exact_price) / exact_price) * 100

# Function to find the best beta for a given set of parameters and exact price
def find_optimal_beta(S0, K, r, q, sigma, m, H, T, exact_price ):
    min_error = float('inf')
    best_beta = 0
    precision_levels=[0.1, 0.01, 0.001, 0.0001]

    for precision in precision_levels:
        start = best_beta - precision if best_beta != 0 else 0
        end = best_beta + precision if best_beta != 0 else 1 + precision
        beta_values = np.arange(start, end, precision)

        for beta_candidate in beta_values:
            H_adj_down, H_adj_up = adjusted_barrier(T, H, sigma, m, beta_candidate)
            price_adj = down_and_call_book(S0, K, T, r, q, sigma, H, H_adj_down, H_adj_up)
            error = percentage_error(price_adj, exact_price)

            if error < min_error:
                min_error = error
                best_beta = beta_candidate

    return round(best_beta, 5), min_error


In [18]:
# Function to compute prices for a given set of parameters
def compute_prices(S0, K, r, m, T, H, sigma, trading_days, n):
    H_adj_down, H_adj_up = adjusted_barrier(T, H, sigma, m, beta)
    price_iter = price_down_and_out_call_brown(m, r, T, sigma, S0, K, H, q, n)
    price = down_and_call_book(S0, K, T, r, q, sigma,H, H_adj_down, H_adj_up)
    price_adj = down_and_call_book(S0, K, T, r, q, sigma,H, H_adj_down, H_adj_up)
    H_percent = ((S0 - H)/S0)*100
    H_log = np.log(H/S0)
    
    #Best beta value
    best_beta = find_optimal_beta(S0, K, r, q, sigma, m, H, T, price_iter[0])
    return [S0, K, r, m, T, H, sigma, trading_days, price_iter[0], price, price_adj, H_percent, H_log, best_beta[0]]

# Values
S0 = 300
K = 300 
q = 0
m = 50
trading_days = 250
h_min = 0.85*S0
h_max = S0-1
beta = 0.5826
t_values = np.arange(0.2,5.3, 0.4)
sigma_values = np.arange(0.2, 0.551, 0.05)
h_values = np.arange(h_min, h_max)
r_values = [0.1]
n = 1.4*10**2

# Calculate total iterations
total_iterations = len(r_values) * len(h_values) * len(t_values) * len(sigma_values)
current_iteration = 0

# Start the timer
start_time = time.time()

# Non-parallel computation with progress tracking
results = []
for r in r_values:
    for H in h_values:
        for T in t_values:
            T_rounded = round(T, 1)
            for sigma in sigma_values:
                result = compute_prices(S0, K, r, m, T_rounded, H, sigma, trading_days, n)
                results.append(result)
                current_iteration += 1
                print(f"Progress: {current_iteration}/{total_iterations} iterations completed.", end='\r', flush=True)

# Move to the next line after the loop is complete
print()

# Create DataFrame from results
df = pd.DataFrame(results, columns=['S0', 'K', 'r', 'm', 'T', 'H', 'sigma', 'trading_days', 'price_iter', 'price', 'price_adj', 'H_percent', 'H_log',  'best_beta'])

# Rounding the values to three decimal places
df = df.round(4)

# Save the DataFrame to a CSV file
df.to_csv('test.csv', index=False)

# Print elapsed time
elapsed_time = time.time() - start_time
print(f"Simulation completed in {elapsed_time:.2f} seconds")
print("Simulation data saved to 'paper_values_300.csv'")

Progress: 64/8800 iterations completed.

KeyboardInterrupt: 