## Question 3

In [79]:
from scipy.special import ndtr
from scipy.stats import norm as nd 
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt 
import newtonRaphson
from tqdm import tqdm

class OptionPriceFuncs: 
    def __init__(self, risk_free, volatility) -> None:
        self.rf = risk_free
        self.local_vol = volatility
        pass
    
    def _get_rf(self):
        return self.rf
    
    def _get_sigma(self): 
        return np.sqrt(self.local_vol)

    def eval_d1(self, spot, strike, maturity):
        
        rf = self._get_rf()
        sigma = self._get_sigma()

        d1 = (np.log(spot/strike)+maturity*(rf+0.5*sigma**2))/(sigma*np.sqrt(maturity))
        
        return d1

    def eval_d2(self, spot, strike, maturity):

        d1 = self.eval_d1(spot,strike,maturity)
        sigma = self._get_sigma()
        d2 = d1 - sigma*np.sqrt(maturity)
        return d2

class AmericanPut(OptionPriceFuncs):
    def price_european(self,spot,strike,time_to_maturity):
        # price the similar european

        d1 = self.eval_d1(spot,strike,time_to_maturity)
        d2 = self.eval_d2(spot,strike,time_to_maturity)

        rf = self._get_rf()

        price = strike*np.exp(-rf*time_to_maturity)*ndtr(-d2)-spot*ndtr(-d1)
        return price
    
    def find_current_preimum(self,spot,curr_time):
        # calculate for all known B value exp(-rdt_m)N(-d2(x,B_m,dt_m))
        # where B_m is the boundary at time m, & dt_m is (t_m-t) 
        B = self.exercise_prices.dropna()
        times = B.index.to_series() - curr_time

        rf = self._get_rf()

        d2 = self.eval_d2(spot,B,times)
        S = np.exp(-rf*times)*ndtr(-d2)

        # return the sum of such terms
        return S.sum()
          
    def price(self,spot,strike,times:np.ndarray):
        # initalise
        self.exercise_prices = pd.Series(index=times)
        
        # stores the length of each time period 
        self.dtimes = self.exercise_prices.copy()
        self.dtimes[:] = np.roll(times,-1) - times
        self.dtimes.round(2)

        # stores the option prices information
        self.option_prices = pd.DataFrame(index =times ,columns =["P","p","premium"])
        
        # set maturity
        maturity = times[-1]

        for i in tqdm(range(len(times)-2,-1,-1)):
            # set up time values, backwards iterate from maturity to t = 0
            curr_time = times[i]
            
            # find current american option price at time t            
            curr_price = self.price_at_time(spot,strike, curr_time, maturity, True)
            
            # calls newtonraphson to find exercise price
            B, flag, maxdev = newtonRaphson.newtonRaphson(
                lambda x : self.tangency_err(x,strike = strike, curr_time = curr_time, maturity = maturity),
                x = [spot], 
                prec = 1e-6
            )

            # update exercise price and premiums
            self.exercise_prices.at[curr_time] = max(B[0],0)
            self.option_prices.at[curr_time,"P"] = curr_price

        return curr_price
    
    def price_at_time(self,spot,strike,curr_time,maturity, record = False):
        # find all relavent time information
        rf = self._get_rf()
        dtime = self.dtimes.loc[curr_time.round(2)]
        time_to_maturity = (maturity - curr_time)
        
        # find current european option price & exercise preimum
        p = self.price_european(spot,strike,time_to_maturity)
        S = self.find_current_preimum(spot,curr_time)
        
        if record: 
            self.option_prices.at[curr_time,'p'] = p
            self.option_prices.at[curr_time,'premium'] = rf*strike*dtime*S

        curr_price = p + rf*strike*dtime*S
        return curr_price
        
    def tangency_err(self,spot_B:list,strike:float,curr_time:float,maturity:float)->list:
        # tangency condtion error function for newton raphson
        spot = spot_B[0]
        P = self.price_at_time(spot,strike,curr_time,maturity)
        return [P - strike + spot]



In [80]:
# pricing at riskfree rate = 0.05, vol = 0.25 
apo = AmericanPut(0.05,0.25**2)

# 5 time segments at {0, 0.25, 0.5, 0.75, 1}
dtime = 1/5
times = np.arange(0,(1+dtime)*1,dtime).round(2)
apo_price = apo.price(100,100,times)
print("exercise prices (B_T) at time T (corrisponding to the index)")
print(apo.exercise_prices)

  self.exercise_prices = pd.Series(index=times)
  0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 5/5 [00:00<00:00,  9.90it/s]

exercise prices (B_T) at time T (corrisponding to the index)
0.0    77.138869
0.2    78.866648
0.4    81.135327
0.6    84.303774
0.8    89.214197
1.0          NaN
dtype: float64





In [81]:
print("American (P), European (p) and exercise premium at time T (corrisponding to the index)")
print(apo.option_prices)


American (P), European (p) and exercise premium at time T (corrisponding to the index)
            P         p   premium
0.0  7.997286  7.458941  0.538344
0.2  7.312153  6.905682  0.406471
0.4  6.487645  6.212442  0.275202
0.6  5.442962  5.298553   0.14441
0.8  3.956014  3.956014       0.0
1.0       NaN       NaN       NaN


## Question 4


In [82]:
import pandas as pd
import numpy as np
import scipy.stats as sps
import matplotlib.pyplot as plt


def price(sample,strike,spot,rf,maturity,payoff_type = "call"):
    if payoff_type == "call":
        return np.exp(-rf*maturity)*np.where(sample>=strike,spot,0)
    elif payoff_type == "put":
        return np.exp(-rf*maturity)*np.where(sample<strike,spot,0)
    elif payoff_type == "between":
        return np.exp(-rf*maturity)*np.where((sample<strike)&(-strike<sample),spot,0)

class MonteCarlo: 
    def __init__(self,params) -> None:
        self.params = params
        self.SampleData = dict()
        pass
    
    def sample_all(self):
        for name in self.SampleData.keys(): 
            sample_method = self.SampleData[name]["method"]
            sample_method(**self.params)

    def price_all(self,strike,spot,rf):
        for name in self.SampleData.keys():
            price_method = self.SampleData[name]["price"]
            samples = self.SampleData[name]["samples"]
            results = price_method(samples,strike,spot,rf,self.params["T"])
            self.SampleData[name]["results"]=results
    
    def get_results(self):
        names = self.SampleData.keys()
        out = pd.DataFrame(columns=names)
        for name in names:
            out.loc["sample mean",name] = self.SampleData[name]["samples"].mean()
            out.loc["sample std", name] = self.SampleData[name]["samples"].std()
            out.loc["result mean",name] = self.SampleData[name]["results"].mean()
            out.loc["result std", name] = self.SampleData[name]["results"].std()
        return out
 
# class interface
class SampleMethod: 
    def __init__(self,name,MCData:MonteCarlo):
        self.name = name
        self.SampleData = MCData.SampleData
        self.SampleData[name] = {
            "samples": None,
            "method": self.sample_method,
            "price": self.price_method
        }
        self.samples = None

    def sample_method():
        pass

    def price_method(self,samples,strike,spot,rf,maturity):
        return price(samples,strike,spot,rf,maturity)

# Benchmark Method     
class Normal(SampleMethod):
    def sample_method(self,n, U , T, sigma, mu):
        u = np.random.normal(0,sigma*np.sqrt(U),n)
        t = np.random.normal(0,sigma*np.sqrt(abs(T-U)),n)
        ru = np.exp(mu*U + u)
        rt = np.exp(mu*abs(T-U) + t)
        samples = 0.5*(ru+ru*rt)

        self.SampleData[self.name]["samples"] = samples
        self.samples = samples
        return samples

# Benchmark Method     
class SimpleSplit(Normal):
    def price_method(self, samples, strike, spot, rf, maturity):
        n = len(samples)
        s1 = samples [:n//2]
        s2 = samples[n//2:]
        c1 = price(s1,strike, spot, rf, maturity)
        c2 = price(s2,strike,spot,rf,maturity)
        results  = 1/2*(c1+c2)
        return results


**Benchmark:**

simple sampling of $w_U$ \& $w_{(T-U)}$ 

$R_U = \exp{(\mu U+\sigma w_U)}, R_{T}=\exp{(\mu (T-U) + \sigma w_{T-U})}$

$h_T = \frac{1}{2}(R_U + R_U R_T)$

$c(h_T) = e^{-rT} S_0 I(h_T>=K) $

In [83]:

class PutCallSplitSampling(Normal):
    def price_method(self,samples,strike,spot,rf,maturity):
        n = len(samples)
        if n%2 == 0:
            s1 = samples[:n//2]
            s2 = samples[n//2:]
        else:
            s1 = samples[1:(n+1)//2]
            s2 = samples[(n+1)//2:]
        call = price(s1,strike,spot,rf,maturity,'call')
        put = price(s2,strike,spot,rf,maturity,'put')
        results = 1/2*(call-put)+1/2*np.exp(-rf*maturity)*spot

        return results
    
class PutCallSampling(Normal):
    def price_method(self, samples, strike, spot, rf, maturity):
        n = len(samples)
        call = price(samples,strike,spot,rf,maturity)
        put = price(-samples,strike,spot,rf,maturity)
        between = price(-samples,strike,spot,rf,maturity, 'between')
        
        results = 1/2*(call-put-between)+1/2*np.exp(-rf*maturity)*spot
        return results


**Put Call Split Sampling**

$R_U = \exp{(\mu U+\sigma w_U)}, R_{T}=\exp{(\mu (T-U) + \sigma w_{T-U})}$

$h_{1,2} = \frac{1}{2}(R_U + R_U R_T)$

$c(h_T) = \frac{1}{2}(c(h_1)-p(h_2))+\frac{1}{2}S_0e^{-rT}$

$p(h) = e^{-rT}S_0I(h<K), c(h) = e^{-rT}S_0I(h>=K) $

**Simple Split**

$h_{1,2} = \frac{1}{2}(R_U + R_U R_T)$

$c(h_T)= \frac{1}{2}(c(h_1)+c(h_2))$



In [84]:
# Factorisation method
class FactoredH(SampleMethod): 
    def sample_method(self,n, U, T, sigma, mu):
        g = np.random.normal(mu*0.5*abs(T+U),sigma*np.sqrt(0.5*abs(T+U)),n)
        x = np.random.normal(mu*0.5*abs(T-U),sigma*np.sqrt(0.5*abs(T-U)),n)
        v1 = np.exp(g + x)
        v2 = np.exp(g - x)
        samples = 0.5*(v1+v2)

        self.SampleData[self.name]["samples"] = samples
        self.samples = samples
        return samples

class Individual(SampleMethod):
    def sample_method(self,n, U , T, sigma, mu):
        u = np.random.normal(0,sigma*np.sqrt(U),n)
        t = np.random.normal(0,sigma*np.sqrt(abs(T-U)),n)
        ru = 0.5*(np.exp(mu*U + u)+np.exp(mu*U - u))
        rt = 0.5*(np.exp(mu*abs(T-U) + t)+np.exp(mu*abs(T-U) - t))
        samples = 0.5*(ru+ru*rt)

        self.SampleData[self.name]["samples"] = samples
        self.samples = samples
        return samples


**Factorisation Method**

alternate method attempted

$R_G = \exp{(\mu \frac{1}{2}(T+U) + \sigma w_{\frac{1}{2}(T+U)})}, R_{X}=\exp{(\mu \frac{1}{2}(T-U) + \sigma w_{\frac{1}{2}(T-U)})}$

$h_{1,2} = \frac{1}{2}(R_G R_X + R_G R_X^{-1})$

$c(h_T) = e^{-rT}S_0I(h_T>=K)$

**Indivdual Method(?)**

also tried out of curiosity

$R'_U = 1/2(\exp{(\mu U + \sigma w_U)} + \exp{(\mu U - \sigma w_U)})$

$R'_T = 1/2(\exp{(\mu (T-U) + \sigma w_{T-U})} + \exp{(\mu (T-U) - \sigma w_{T-U})})$

$h_{1,2} = \frac{1}{2}(R'_U + R'_U R'_T)$

$c(h_T) = e^{-rT}S_0I(h_T>=K)$



Jenson's Inequalty doesn't hold, no bounds for this method? 

$E(h) == E(R'_U+R'_U R'_T)$, but there's no bounds for $E(c(h)) <-> c(E(h))$

In [85]:
U = 0.5
T = 1
rf =0.03
sigma = 0.2
mu = (rf - 0.5*sigma**2)
strike = 1.1

params = {
    "n": 50000,
    "U":U,
    "T":T,
    "sigma":sigma,
    "mu":mu
}
n = 10000

MC = MonteCarlo(params)
n = Normal("Benchmark",MC)
q = PutCallSplitSampling("PutCallSplit",MC)
s = SimpleSplit("SimpleSplit",MC)
a = FactoredH("FactoredH",MC)
e = Individual("Individual",MC)


MC.sample_all()
# set samples for normal & PutCallSplit for comparison
MC.SampleData["PutCallSplit"]["samples"]=n.samples
MC.SampleData["SimpleSplit"]["samples"]=n.samples
MC.price_all(1.1,1,rf)


out = MC.get_results()
print(out)

            Benchmark PutCallSplit SimpleSplit FactoredH Individual
sample mean  1.024739     1.024739    1.024739   1.02642   1.022904
sample std   0.163549     0.163549    0.163549  0.180141   0.016293
result mean  0.290959     0.290959    0.290959  0.306079   0.003901
result std   0.444638     0.313798    0.313798  0.450942   0.061406


**Comment:**

PutCallSplit, sucessfully matches the mean of the benchmark and also demonstrait varience reduction. Note that varience plateaus. comparing to the Simple Split, the reduction seems to simply comes from the spliting canceling out some of the varience.

perhaps futher work can be done by assuming $h_T$ is log normal, as we know it's mean and varience we can perhaps work out in reverse an antihetical pair

Factored method have an unexpected bias and actually increased variance, this is because whislt covarience is reduce, indivdual varience increased negating the benefits

The Individual method tried out matchs the mean of $h_T$ but provides no guarentees on $c(h_T)$ and should be discarded unless payoff is convex



In [48]:
res = out.at["result mean","PutCallSplit"]
std = out.at["result std","PutCallSplit"]
print("option price : {0:2f} +- {1:2f}".format(res,std))

option price : 0.286359 +- 0.311493
