In [1]:
""" Shared attributes and functions of FD """
import numpy as np

class FiniteDifferences(object):
    def __init__(self, S0, K, r, T, sigma, Smax, M, N, is_call=True):
        """
        Initialize finite differences parameters
        
        Parameters:
        S0: Initial stock price
        K: Strike price
        r: Risk-free rate
        T: Time to expiration
        sigma: Volatility
        Smax: Maximum stock price for grid
        M: Number of stock price steps
        N: Number of time steps
        is_call: True for call option, False for put option
        """
        self.S0 = S0
        self.K = K
        self.r = r
        self.T = T
        self.sigma = sigma
        self.Smax = Smax
        self.M, self.N = int(M), int(N)  # Ensure M&N are integers
        self.is_call = is_call
        
        # Grid spacing parameters
        self.dS = Smax / float(self.M)  # Stock price step size
        self.dt = T / float(self.N)     # Time step size
        
        # Index arrays for grid construction
        self.i_values = np.arange(self.M)  # Stock price indices
        self.j_values = np.arange(self.N)  # Time indices
        
        # Initialize the finite difference grid (M+1 x N+1)
        self.grid = np.zeros(shape=(self.M+1, self.N+1))
        
        # Stock price boundary conditions (from 0 to Smax)
        self.boundary_conds = np.linspace(0, Smax, self.M+1)
    
    def setup_boundary_conditions(self):
        """ Setup boundary conditions for the PDE - to be implemented by subclasses """
        pass
    
    def setup_coefficients(self):
        """ Setup finite difference coefficients - to be implemented by subclasses """
        pass
    
    def traverse_grid(self):
        """ Iterate the grid backwards in time - to be implemented by subclasses """
        pass
    
    def interpolate(self):
        """ 
        Use piecewise linear interpolation on the initial grid column 
        to get the closest price at S0.
        """
        return np.interp(self.S0, self.boundary_conds, self.grid[:, 0])
    
    def price(self):
        """ 
        Main pricing method - orchestrates the finite difference solution process
        """
        # Step 1: Setup boundary conditions at expiration and boundaries
        self.setup_boundary_conditions()
        
        # Step 2: Setup finite difference coefficients for the PDE
        self.setup_coefficients()
        
        # Step 3: Solve the PDE by traversing backwards through time
        self.traverse_grid()
        
        # Step 4: Interpolate to get option value at initial stock price S0
        return self.interpolate()

In [2]:
""" Explicit method of Finite Differences """
class FDExplicitEu(FiniteDifferences):
    def setup_boundary_conditions(self):
        """ Setup boundary conditions for explicit FD method """
        if self.is_call:
            # Call option payoff at expiration: max(S - K, 0)
            self.grid[:, -1] = np.maximum(
                self.boundary_conds - self.K, 0)
            
            # Upper boundary condition for call: S - K * e^(-r*(T-t))
            # As S approaches Smax, option behaves like stock minus discounted strike
            self.grid[-1, :-1] = (self.Smax - self.K) * \
                np.exp(-self.r * self.dt * (self.N - self.j_values))
        else:
            # Put option payoff at expiration: max(K - S, 0)
            self.grid[:, -1] = \
                np.maximum(self.K - self.boundary_conds, 0)
            
            # Lower boundary condition for put: K * e^(-r*(T-t)) - S
            # As S approaches 0, put option approaches discounted strike
            self.grid[0, :-1] = (self.K - self.Smax) * \
                np.exp(-self.r * self.dt * (self.N - self.j_values))
    
    def setup_coefficients(self):
        """ Setup finite difference coefficients for explicit scheme """
        # Coefficient for V[i-1, j+1] term (lower neighbor)
        self.a = 0.5 * self.dt * ((self.sigma**2) * (self.i_values**2) - 
                                  self.r * self.i_values)
        
        # Coefficient for V[i, j+1] term (current node)
        self.b = 1 - self.dt * ((self.sigma**2) * (self.i_values**2) + self.r)
        
        # Coefficient for V[i+1, j+1] term (upper neighbor)
        self.c = 0.5 * self.dt * ((self.sigma**2) * (self.i_values**2) + 
                                  self.r * self.i_values)
    
    def traverse_grid(self):
        """ Traverse the grid backwards in time using explicit finite differences """
        # Work backwards from expiration (j = N-1) to present (j = 0)
        for j in reversed(self.j_values):
            # Skip boundary nodes (i=0 and i=M), iterate through interior nodes
            for i in range(self.M)[2:]:
                # Explicit finite difference formula:
                # V[i,j] = a[i]*V[i-1,j+1] + b[i]*V[i,j+1] + c[i]*V[i+1,j+1]
                self.grid[i, j] = self.a[i] * self.grid[i-1, j+1] + \
                                  self.b[i] * self.grid[i, j+1] + \
                                  self.c[i] * self.grid[i+1, j+1]

In [3]:
""" 
Price a European option by the implicit method of finite differences.
"""

import scipy.linalg as linalg


class FDImplicitEu(FDExplicitEu):
    def setup_coefficients(self):
        """ Setup finite difference coefficients for implicit scheme """
        # Coefficient for V[i-1, j] term (lower diagonal)
        # Note: Signs are opposite of explicit method due to implicit formulation
        self.a = 0.5 * (self.r * self.dt * self.i_values - 
                        (self.sigma**2) * self.dt * (self.i_values**2))
        
        # Coefficient for V[i, j] term (main diagonal)
        # All terms moved to left side of equation
        self.b = 1 + \
                 (self.sigma**2) * self.dt * (self.i_values**2) + \
                 self.r * self.dt
        
        # Coefficient for V[i+1, j] term (upper diagonal)
        self.c = -0.5 * (self.r * self.dt * self.i_values + 
                         (self.sigma**2) * self.dt * (self.i_values**2))
        
        # Build tridiagonal coefficient matrix for linear system
        # Lower diagonal: a[2] to a[M-1] (indices 2 to M-1)
        # Main diagonal: b[1] to b[M-1] (indices 1 to M-1) 
        # Upper diagonal: c[1] to c[M-2] (indices 1 to M-2)
        self.coeffs = np.diag(self.a[2:self.M], -1) + \
                      np.diag(self.b[1:self.M]) + \
                      np.diag(self.c[1:self.M-1], 1)
    
    def traverse_grid(self):
        """ Solve using linear systems of equations """
        # LU decomposition of coefficient matrix (done once for efficiency)
        P, L, U = linalg.lu(self.coeffs)
        
        # Auxiliary array for boundary condition adjustments
        aux = np.zeros(self.M-1)
        
        # Work backwards from expiration (j = N-1) to present (j = 0)
        for j in reversed(range(self.N)):
            # Adjust for boundary condition at lower boundary (i=0)
            # This handles the known boundary value at grid[0, j]
            aux[0] = np.dot(-self.a[1], self.grid[0, j])
            
            # Solve linear system: coeffs * V[:, j] = V[:, j+1] + aux
            # Forward substitution: L * y = (V[:, j+1] + aux)
            x1 = linalg.solve(L, self.grid[1:self.M, j+1] + aux)
            
            # Back substitution: U * V[:, j] = y
            x2 = linalg.solve(U, x1)
            
            # Update interior grid points (boundary points already set)
            self.grid[1:self.M, j] = x2

In [5]:
""" Crank-Nicolson method of Finite Differences """


class FDCnEu(FDExplicitEu):
    def setup_coefficients(self):
        """ Setup finite difference coefficients for Crank-Nicolson scheme """
        # Crank-Nicolson uses weighted average of explicit and implicit schemes
        # Coefficients are scaled by 0.25 (half of the 0.5 time weighting)
        
        # Alpha: coefficient for lower diagonal terms (i-1 neighbors)
        self.alpha = 0.25 * self.dt * (
            (self.sigma**2) * (self.i_values**2) - self.r * self.i_values)
        
        # Beta: coefficient for main diagonal terms (current node)
        # Note: negative sign due to Crank-Nicolson formulation
        self.beta = -self.dt * 0.5 * (
            (self.sigma**2) * (self.i_values**2) + self.r)
        
        # Gamma: coefficient for upper diagonal terms (i+1 neighbors)
        self.gamma = 0.25 * self.dt * (
            (self.sigma**2) * (self.i_values**2) + self.r * self.i_values)
        
        # M1: Left-hand side matrix (implicit part)
        # Structure: -alpha on lower diagonal, (1-beta) on main diagonal, -gamma on upper diagonal
        self.M1 = -np.diag(self.alpha[2:self.M], -1) + \
                   np.diag(1 - self.beta[1:self.M]) - \
                   np.diag(self.gamma[1:self.M-1], 1)
        
        # M2: Right-hand side matrix (explicit part)  
        # Structure: +alpha on lower diagonal, (1+beta) on main diagonal, +gamma on upper diagonal
        self.M2 = np.diag(self.alpha[2:self.M], -1) + \
                  np.diag(1 + self.beta[1:self.M]) + \
                  np.diag(self.gamma[1:self.M-1], 1)
    
    def traverse_grid(self):
        """ Solve using linear systems of equations for Crank-Nicolson """
        # LU decomposition of left-hand side matrix (done once for efficiency)
        P, L, U = linalg.lu(self.M1)
        
        # Work backwards from expiration (j = N-1) to present (j = 0)
        for j in reversed(range(self.N)):
            # Crank-Nicolson equation: M1 * V[:, j] = M2 * V[:, j+1]
            # Right-hand side: apply explicit part to known values at j+1
            rhs = np.dot(self.M2, self.grid[1:self.M, j+1])
            
            # Solve linear system using pre-computed LU decomposition
            # Forward substitution: L * y = rhs
            x1 = linalg.solve(L, rhs)
            
            # Back substitution: U * V[:, j] = y
            x2 = linalg.solve(U, x1)
            
            # Update interior grid points (boundary points handled separately)
            self.grid[1:self.M, j] = x2

In [6]:
"""
Price an American option by the Crank-Nicolson method

This module implements the finite difference Crank-Nicolson method for pricing
American options. It extends the European option pricing class to handle the
early exercise feature of American options using iterative methods.
"""

import sys



class FDCnAm(FDCnEu):
    """
    American option pricing using Crank-Nicolson finite difference method.
    
    This class extends the European option pricing implementation to handle
    American options by incorporating early exercise conditions through
    iterative solution of the linear system at each time step.
    """
    
    def __init__(self, S0, K, r, T, sigma, Smax, M, N, omega, tol, is_call=True):
        """
        Initialize American option pricing parameters.
        
        Parameters:
        -----------
        S0 : float
            Initial stock price
        K : float
            Strike price
        r : float
            Risk-free interest rate
        T : float
            Time to expiration
        sigma : float
            Volatility
        Smax : float
            Maximum stock price for grid
        M : int
            Number of stock price steps
        N : int
            Number of time steps
        omega : float
            Relaxation parameter for SOR iteration (typically 1.0-2.0)
        tol : float
            Tolerance for convergence of iterative method
        is_call : bool, optional
            True for call option, False for put option (default: True)
        """
        # Initialize parent class (European option)
        super(FDCnAm, self).__init__(
            S0, K, r, T, sigma, Smax, M, N, is_call)
        
        # American option specific parameters
        self.omega = omega  # SOR relaxation parameter
        self.tol = tol      # Convergence tolerance
        
        # Pre-compute index arrays for efficiency
        self.i_values = np.arange(self.M+1)  # Stock price grid indices
        self.j_values = np.arange(self.N+1)  # Time grid indices

    def _setup_boundary_conditions_(self):
        """
        Set up boundary conditions and payoff function for American option.
        
        For American options, we need to store the intrinsic value (payoff)
        at each grid point to enforce the early exercise constraint.
        """
        if self.is_call:
            # Call option payoff: max(S - K, 0)
            self.payoffs = np.maximum(
                self.boundary_conds[1:self.M] - self.K, 0)
        else:
            # Put option payoff: max(K - S, 0)
            self.payoffs = np.maximum(
                self.K - self.boundary_conds[1:self.M], 0)
        
        # Initialize past values with payoff at expiration
        self.past_values = self.payoffs
        
        # Boundary values for the option (typically 0 for far OTM boundary)
        self.boundary_values = self.K * \
            np.exp(-self.r * self.dt * (self.N - self.j_values))

    def _traverse_grid_(self):
        """
        Solve the finite difference system using iterative methods.
        
        This method implements the Successive Over-Relaxation (SOR) algorithm
        to solve the linear system at each time step while enforcing the
        American option constraint that the option value must be at least
        as large as the intrinsic value (early exercise condition).
        """
        # Auxiliary arrays for computation
        aux = np.zeros(self.M-1)      # Boundary condition contributions
        new_values = np.zeros(self.M-1)  # Updated option values
        
        # Backward time stepping (from expiration to present)
        for j in reversed(range(self.N)):
            # Set up boundary condition contribution
            aux[0] = self.alpha[1] * (self.boundary_values[j] + 
                                    self.boundary_values[j+1])
            
            # Right-hand side of the linear system
            rhs = np.dot(self.M2, self.past_values) + aux
            
            # Copy current values for iteration
            old_values = np.copy(self.past_values)
            
            # Initialize error for convergence check
            error = sys.float_info.max
            
            # SOR iteration until convergence
            while self.tol < error:
                # First interior point (k=0)
                new_values[0] = max(
                    self.payoffs[0],  # Early exercise constraint
                    old_values[0] + self.omega / (1 - self.beta[1]) * (
                        rhs[0] - (1 - self.beta[1]) * old_values[0] + 
                        (self.gamma[1] * old_values[1])
                    )
                )
                
                # Interior points (k=1 to M-3)
                for k in range(self.M-2)[1:]:
                    new_values[k] = max(
                        self.payoffs[k],  # Early exercise constraint
                        old_values[k] + self.omega / (1 - self.beta[k+1]) * (
                            rhs[k] + 
                            self.alpha[k+1] * new_values[k-1] - 
                            (1 - self.beta[k+1]) * old_values[k] + 
                            self.gamma[k+1] * old_values[k+1]
                        )
                    )
                
                # Last interior point (k=M-2)
                new_values[-1] = max(
                    self.payoffs[-1],  # Early exercise constraint
                    old_values[-1] + self.omega / (1 - self.beta[-2]) * (
                        rhs[-1] + 
                        self.alpha[-2] * new_values[-2] - 
                        (1 - self.beta[-2]) * old_values[-1]
                    )
                )
                
                # Check convergence
                error = np.linalg.norm(new_values - old_values)
                old_values = np.copy(new_values)
            
            # Update for next time step
            self.past_values = np.copy(new_values)
        
        # Combine boundary and interior values for final result
        self.values = np.concatenate((
            [self.boundary_values[0]], 
            new_values, 
            [0]
        ))

    def _interpolate_(self):
        """
        Interpolate the option value at the initial stock price S0.
        
        Uses linear interpolation on the final computed values to get
        the option price at the exact initial stock price.
        
        Returns:
        --------
        float
            The interpolated option value at S0
        """
        # Use linear interpolation on final values as 1D array
        return np.interp(self.S0, self.boundary_conds, self.values)