In [4]:
import numpy as np
import scipy.stats as ss

from scipy import sparse
from scipy.sparse.linalg import splu
from time import time


In [5]:
class Option_param:
    """
    Option class wants the option parameters:
    S0 = current stock price
    K = Strike price
    T = time to maturity
    v0 = (optional) spot variance
    exercise = European or American
    """

    def __init__(self, S0=15, K=15, T=1, v0=0.04, payoff="call", exercise="European"):
        self.S0 = S0
        self.v0 = v0
        self.K = K
        self.T = T

        if exercise == "European" or exercise == "American":
            self.exercise = exercise
        else:
            raise ValueError("invalid type. Set 'European' or 'American'")

        if payoff == "call" or payoff == "put":
            self.payoff = payoff
        else:
            raise ValueError("invalid type. Set 'call' or 'put'")

In [6]:
class Diffusion_process:
    """
    Class for the diffusion process:
    r = risk free constant rate
    sig = constant diffusion coefficient
    mu = constant drift
    """

    def __init__(self, r=0.1, sig=0.2, mu=0.1):
        self.r = r
        self.mu = mu
        if sig <= 0:
            raise ValueError("sig must be positive")
        else:
            self.sig = sig

    def exp_RV(self, S0, T, N):
        W = ss.norm.rvs((self.r - 0.5 * self.sig**2) * T, np.sqrt(T) * self.sig, N)
        S_T = S0 * np.exp(W)
        return S_T.reshape((N, 1))

In [7]:
class BS_pricer:
    """
    Closed Formula.
    Monte Carlo.
    Finite-difference Black-Scholes PDE:
     df/dt + r df/dx + 1/2 sigma^2 d^f/dx^2 -rf = 0
    """

    def __init__(self, Option_info, Process_info):
        """
        Option_info: of type Option_param. It contains (S0,K,T)
                i.e. current price, strike, maturity in years
        Process_info: of type Diffusion_process. It contains (r, mu, sig) i.e.
                interest rate, drift coefficient, diffusion coefficient
        """
        self.r = Process_info.r  # interest rate
        self.sig = Process_info.sig  # diffusion coefficient
        self.S0 = Option_info.S0  # current price
        self.K = Option_info.K  # strike
        self.T = Option_info.T  # maturity in years
        self.exp_RV = Process_info.exp_RV  # function to generate solution of GBM

        self.price = 0
        self.S_vec = None
        self.price_vec = None
        self.mesh = None
        self.exercise = Option_info.exercise
        self.payoff = Option_info.payoff

    def payoff_f(self, S):
        if self.payoff == "call":
            Payoff = np.maximum(S - self.K, 0)
        elif self.payoff == "put":
            Payoff = np.maximum(self.K - S, 0)
        return Payoff

    @staticmethod
    def BlackScholes(payoff="call", S0=100.0, K=100.0, T=1.0, r=0.1, sigma=0.2):
        """Black Scholes closed formula:
        payoff: call or put.
        S0: float.    initial stock/index level.
        K: float strike price.
        T: float maturity (in year fractions).
        r: float constant risk-free short rate.
        sigma: volatility factor in diffusion term."""

        d1 = (np.log(S0 / K) + (r + sigma**2 / 2) * T) / (sigma * np.sqrt(T))
        d2 = (np.log(S0 / K) + (r - sigma**2 / 2) * T) / (sigma * np.sqrt(T))

        if payoff == "call":
            return S0 * ss.norm.cdf(d1) - K * np.exp(-r * T) * ss.norm.cdf(d2)
        elif payoff == "put":
            return K * np.exp(-r * T) * ss.norm.cdf(-d2) - S0 * ss.norm.cdf(-d1)
        else:
            raise ValueError("invalid type. Set 'call' or 'put'")

    @staticmethod
    def vega(sigma, S0, K, T, r):
        """BS vega: derivative of the price with respect to the volatility"""
        d1 = (np.log(S0 / K) + (r + sigma**2 / 2) * T) / (sigma * np.sqrt(T))
        return S0 * np.sqrt(T) * ss.norm.pdf(d1)


    def PDE_price(self, steps, Time=False, solver="splu"):
        """
        steps = tuple with number of space steps and time steps
        payoff = "call" or "put"
        exercise = "European" or "American"
        Time = Boolean. Execution time.
        Solver = spsolve or splu or Thomas or SOR
        """
        t_init = time()

        Nspace = steps[0]
        Ntime = steps[1]

        S_max = 6 * float(self.K)
        S_min = float(self.K) / 6
        x_max = np.log(S_max)
        x_min = np.log(S_min)
        x0 = np.log(self.S0)  # current log-price

        x, dx = np.linspace(x_min, x_max, Nspace, retstep=True)
        t, dt = np.linspace(0, self.T, Ntime, retstep=True)

        self.S_vec = np.exp(x)  # vector of S
        Payoff = self.payoff_f(self.S_vec)

        V = np.zeros((Nspace, Ntime))
        if self.payoff == "call":
            V[:, -1] = Payoff
            V[-1, :] = np.exp(x_max) - self.K * np.exp(-self.r * t[::-1])
            V[0, :] = 0
        else:
            V[:, -1] = Payoff
            V[-1, :] = 0
            V[0, :] = Payoff[0] * np.exp(-self.r * t[::-1])  # Instead of Payoff[0] I could use K
            # For s to 0, the limiting value is e^(-rT)(K-s)

        sig2 = self.sig**2
        dxx = dx**2
        a = (dt / 2) * ((self.r - 0.5 * sig2) / dx - sig2 / dxx)
        b = 1 + dt * (sig2 / dxx + self.r)
        c = -(dt / 2) * ((self.r - 0.5 * sig2) / dx + sig2 / dxx)

        D = sparse.diags([a, b, c], [-1, 0, 1], shape=(Nspace - 2, Nspace - 2)).tocsc()

        offset = np.zeros(Nspace - 2)

        if solver == "splu":
            DD = splu(D)
            if self.exercise == "European":
                for i in range(Ntime - 2, -1, -1):
                    offset[0] = a * V[0, i]
                    offset[-1] = c * V[-1, i]
                    V[1:-1, i] = DD.solve(V[1:-1, i + 1] - offset)
            elif self.exercise == "American":
                for i in range(Ntime - 2, -1, -1):
                    offset[0] = a * V[0, i]
                    offset[-1] = c * V[-1, i]
                    V[1:-1, i] = np.maximum(DD.solve(V[1:-1, i + 1] - offset), Payoff[1:-1])
        else:
            raise ValueError("Solver is splu, spsolve, SOR or Thomas")

        self.price = np.interp(x0, x, V[:, 0])
        self.price_vec = V[:, 0]
        self.mesh = V

        if Time is True:
            elapsed = time() - t_init
            return self.price, elapsed
        else:
            return self.price

    def LSM(self, N=10000, paths=10000, order=2):
        """
        Longstaff-Schwartz Method for pricing American options

        N = number of time steps
        paths = number of generated paths
        order = order of the polynomial for the regression
        """

        if self.payoff != "put":
            raise ValueError("invalid type. Set 'call' or 'put'")

        dt = self.T / (N - 1)  # time interval
        df = np.exp(-self.r * dt)  # discount factor per time time interval

        X0 = np.zeros((paths, 1))
        increments = ss.norm.rvs(
            loc=(self.r - self.sig**2 / 2) * dt,
            scale=np.sqrt(dt) * self.sig,
            size=(paths, N - 1),
        )
        X = np.concatenate((X0, increments), axis=1).cumsum(1)
        S = self.S0 * np.exp(X)

        H = np.maximum(self.K - S, 0)  # intrinsic values for put option
        V = np.zeros_like(H)  # value matrix
        V[:, -1] = H[:, -1]

        # Valuation by LS Method
        for t in range(N - 2, 0, -1):
            good_paths = H[:, t] > 0
            rg = np.polyfit(S[good_paths, t], V[good_paths, t + 1] * df, 2)  # polynomial regression
            C = np.polyval(rg, S[good_paths, t])  # evaluation of regression

            exercise = np.zeros(len(good_paths), dtype=bool)
            exercise[good_paths] = H[good_paths, t] > C

            V[exercise, t] = H[exercise, t]
            V[exercise, t + 1 :] = 0
            discount_path = V[:, t] == 0
            V[discount_path, t] = V[discount_path, t + 1] * df

        V0 = np.mean(V[:, 1]) * df  #
        return V0

In [8]:
# Creates the object with the parameters of the option
opt_param = Option_param(S0=100, K=100, T=1, exercise="American", payoff="put")
# Creates the object with the parameters of the process
diff_param = Diffusion_process(r=0.1, sig=0.2)
# Creates the pricer object
BS = BS_pricer(opt_param, diff_param)

In [9]:
BS.LSM(N=10000, paths=10000, order=2)  # Longstaff Schwartz Method

4.674893974879958

In [10]:
N_space = 8000
N_time = 5000
BS.PDE_price((N_space, N_time))

4.815638949372102