In [1]:
import numpy as np

In [49]:
def CalculateDiffusionIncrements(CorrMatrix, dt, N):
    '''
    Given the sqare root of the correlation matrix and the size of time step, 
    calculate the values of dW_t or dB_t for all diffusion brownian motions in our model. 
    
    PARAMS:
    
    CorrMatrix: value of stochastic correlation matrix observation at time t.
    Size of the matrix is 2N+1 x 2N + 1. 
    
    dt: number, the size of time step of the simulation. 
    
    N:  the number of particles in our system. 
    '''
    iidStandartNormalVector = np.random.randn(2*N + 1)
    Increment = np.linalg.cholesky(CorrMatrix) @ iidStandartNormalVector * np.sqrt(dt)
    
    return Increment

In [59]:
class coef_function1d(object):
    '''
    1-dimensional coeffitient function. It depends on only 1 argument.
    Contains its value, first and second derivatives in point x. 
    '''
    
    def __init__(self, value, FirstDerivative, SecondDerivative):
        self.value = value
        self.dx    = FirstDerivative
        self.dxdx  = SecondDerivative
    
    def ReturnValue(self, x):
        return self.value(x)
    
    def dx(self, x):
        return self.FirstDerivative(x)
    
    def dxdx(self, x):
        return self.SecondDerivative(x)
    
    
class coef_function2d(object):
    '''
    2-dimensional coefficient function. Usually takes
    time-moment and process value as arguments. Both are real numbers.
    '''
    
    def __init__(self, value, Dt, Dx, DxDx):
        self.value = value
        self.Dt    = Dt
        self.Dx    = Dx
        self.DxDx  = Dxdx
    
    def value(self, t, x):
        return value(t, x)

    def dt(self, t, x):
        return self.Dt(t, x)
    
    def dx(self, t, x):
        return self.Dx(t, x)
    
    def dxdx(self, t, x):
        return self.DxDx(t, x)
    
class coef_function3d(object):
    '''
    3-dimensional coefficient function. Logic remains the same. 
    '''
    
    def __init__(self, value, Dt, Dx, Dy, DxDy, DxDx, DyDy):
        self.value = value
        self.Dt    = Dt
        self.Dx    = Dx
        self.Dy    = Dy
        self.DxDy  = Dxdy
        self.DxDx  = Dxdx
        self.DyDy  = Dydy
        
    def value(self, t, x, y):
        return self.value(t, x, y)
    
    def dt(self, t, x, y):
        return self.Dt(t, x, y)
    
    def dx(self, t, x, y):
        return self.Dx(t, x, y)
    
    def dy(self, t, x, y):
        return self.Dy(t, x, y)
    
    def dxdy(self, t, x, y):
        return self.DxDy(t, x, y)
    
    def dxdx(self, t, x, y):
        return self.DxDx(t, x, y)
    
    def dydy(self, t, x, y):
        return self.DyDy(t, x, y)   
    
    
class mean_reversion(object):
    '''
    Class for correlation mean-reverted drift function.
    It depends on N+1 arguments: nu1, ... , nuN, rho. 
    '''
    def __init__(self, value, gradient, hessian):
        self.value    = value
        self.gradient = gradient
        self.hessian  = hessian
        
    def dx(self, NuVector, rho, coordinate):
        return self.gradient[coordinate](NuVector, rho)
    
    def dxdx(self, NuVector, rho, i, j):
        return self.gessian[i][j](NuVector, rho)

In [None]:
class ParticleProcess(object):
    '''
    This process describes the behaviour of the asset price. 
    
    The process can be described by an SDE:
    dS = drift_coef(t, S)dt + diffusion_coef(t, S, Nu)dWt, where Nu is the volatility process.
    
    NOT IMPLEMENTED YET
    
    PARAMS:
    S0:             the initial value of our diffusion. A number or a vector of numbers.
    drift_coef:     2d function, depends on time and process value. 
    diffusion_coef: 3d function, depends on time, process value and volatility value. 
    '''
    
    def __init__(self, S0, drift_coef, diffusion_coef):
        self.S0             = S0
        self.drift_coef     = drift_coef
        self.diffusion_coef = diffusion_coef
        

    

class VolatilityProcess(object):
    '''
    This process describes the behaviour of the asset price. 
    
    The process can be described by an SDE:
    dNu = drift_coef(t, Nu)dt + diffusion_coef(t, Nu)dBt, where Nu is the volatility process.
    
    NOT IMPLEMENTED YET
    
    PARAMS:
    Nu0:            the initial value of our diffusion. A number or a vector of numbers.
    drift_coef:     2d function, depends on time and volatility value. 
    diffusion_coef: 2d function, depends on time, and volatility value. 
    '''
    
    def __init__(self, Nu0, drift_coef, diffusion_coef):
        self.Nu0            = Nu0
        self.drift_coef     = drift_coef
        self.diffusion_coef = diffusion_coef

        
    

class CorrelationProcess(object):
    '''
    This process describes the behaviour of the asset price. 
    
    The process can be described by an SDE:
    dNu = drift_coef(t, Nu)dt + diffusion_coef(t, Nu)dBt, where Nu is the volatility process.
    
    NOT IMPLEMENTED YET
    
    PARAMS:
    rho0:           the initial value of our process. A number or a vector of numbers.
    mean_reversion: Psy(Nu1, ... , NuN) - rho_t. 
    diffusion_coef: 1d function, depends only on correlation value. 
    '''
    def __init__(self, rho0, alpha, mean_reversion, diffustion_coef):
        self.rho0           = rho0
        self.alpha          = alpha
        self.drift_coef     = self.alpha * mean_reversion
        self.diffusion_coef = diffusion_coef

In [None]:
def DoubleIntegral(BrownianIncrements, V, dt, i, j):
    '''
    Calculates the values of double integrals that appear in second-order refinement.
    
    PARAMS:
    BrownianIncrements: vector of all Brownian increments. 
    V: matrix of helping variables. V[i][j] = -V[j][i] for all i < j
       V[i][i] = h for all i. 
       V[i][j] = h with prob 0.5 and -h otherwise for all i < j.
    dt: size of time step
    i, j: number of corresponding Brownian motions with which the integral appeared. 
    '''
    if   (i == 0 and j == 0):
        return 0.5 * dt ** 2
    elif (i == 0):
        0.5 * dt * BrownianIncrements[j]
    elif (j == 0):
        0.5 * dt * BrownianIncrements[i]
    elif (i > 0 and j > 0):
        return 0.5 * (BrownianIncrements[i] * BrownianIncrements[j] - V[i][j])
    else: 
        print("Error in i, j.\n")

In [None]:
def ParticleDriftItoOperator(ParticleProcess, t, S):
    
    DriftValue = ParticleProcess.drift_coef.dt(t, S) + 
                 ParticleProcess.drift_coef.value(t, S) * ParticleProcess.drift_coef.dx(t, S) +
                 0.5 * (ParticleProcess.diffusion_coef(t, S) ** 2) * ParticleProcess.drift_coef.dxdx(t,S)
            
    DiffusionValue = ParticleProcess.diffusion_coef(t, S) * ParticleProcess.drift_coef.dx(t, S)
    
    return DriftValue, DiffusionValue


def ParticleDiffusionItoOperator(ParticleProcess, VolatilityProcess, Corr, t, S, Nu):
    
    DriftValue = ParticleProcess.diffusion_coef.dt(t, S, Nu) + 
                 ParticleProcess.drift_coef.value(t, S) * ParticleProcess.diffusion_coef.dx(t, S, Nu) + 
                 VolatilityProcess.drift_coef.value(t, Nu) * ParticleProcess.diffusion_coef.dy(t, S, Nu) +
                 0.5 * (ParticleProcess.diffusion_coef.value(t, S, Nu)**2) 
                 * ParticleProcess.diffusion_coef.dxdx(t, S, Nu) + 
                0.5 * (VolatilityProcess.diffusion_coef.value(t, Nu)**2) 
                     * ParticleProcess.diffusion_coef.dydy(t, S, Nu) + 
                Corr * ParticleProcess.diffusion_coef.value(t, S, Nu) * 
                VolatilityProcess.diffusion_coef.value(t, Nu) * ParticleProcess.diffusion_coef.dxdy(t, S, Nu)
    
    DiffusionSigmaValue = ParticleProcess.value(t, S, Nu) * ParticleProcess.diffusion_coef.dx(t, S, Nu) 
    DiffusionDValue     = VolatilityProcess.value(t, Nu)  * ParticleProcess.diffusion_coef.dy(t, S, Nu)
    
    return DriftValue, DiffusionSigmaValue, DiffusionDValue
    

def VolatilityDriftItoOperator(VolatilityProcess, t, Nu):
    
    DriftValue = VolatilityProcess.drift_coef.dt(t, Nu) +  
                 VolatilityProcess.drift_coef.value(t, Nu) * VolatilityProcess.drift_coef.dx(t, Nu)  +
                 0.5 * (VolatilityProcess.diffusion_coef.value(t, Nu) ** 2) 
                 * VolatilityProcess.drift_coef.dxdx(t, Nu)
    
    DiffusionValue = VolatilityProcess.diffusion_coef.value(t, Nu) * VolatilityProcess.drift_coef.dx(t, Nu)
    
    return DriftValue, DiffusionValue
    
    
def VolatilityDiffusionItoOperator(VolatilityProcess, t, Nu):
    
    DriftValue = VolatilityProcess.diffusion_coef.dt(t, Nu) +  
                 VolatilityProcess.drift_coef.value(t, Nu) * VolatilityProcess.diffusion_coef.dx(t, Nu)  +
                 0.5 * (VolatilityProcess.diffusion_coef.value(t, Nu) ** 2) 
                 * VolatilityProcess.diffusion_coef.dxdx(t, Nu)
    
    DiffusionValue = VolatilityProcess.diffusion_coef.value(t, Nu) * VolatilityProcess.diffusion_coef.dx(t, Nu)
    
    return DriftValue, DiffusionValue
                
def CorrelationDriftItoOperator(CorrelationProcess, VolatilityVectorProcess, t, NuVector, rho, N):
    
    DriftValue = (-1) * CorrelationProcess.drift_coef.value(NuVector, rho)
    for k in range(N):
        DriftValue += VolatilityVectorProcess[k].drift_coef.value(t, NuVector[k]) *
                      CorrelationProcess.drift_coef.dx(NuVector, rho, k)       
    for k in range(N):
        for l in range(N):
            DriftValue += 0.5 * VolatilityVectorProcess[k].diffusion_coef.value(t, NuVector[k]) 
                       * VolatilityVectorProcess[l].diffusion_coef.value(t, NuVector[l]) 
                       * CorrelationProcess.drift_coef.dxdx(NuVector, rho, k, l)
                    
    DiffusionDVectorValue = []
    for k in range(N):
        DiffusionDVectorValue.append(VolatilityVectorProcess[k].diffusion_coef.value(t, NuVector[k]) 
                                     * CorrelationProcess.drift_coef.dx(NuVector, rho, k))
    
    DiffusionOmegaVectorValue = - CorrelationProcess.diffusion_coef.value(rho)*CorrelationProcess.alpha
    
    return DriftValue, DiffusionDVectorValue, DiffusionOmegaVectorValue
                        
    
def CorrelationDiffusionItoOperator(CorrelationProcess, rho, Nus):
    
    DriftValue = CorrelationProcess.drift_coef.value(rho) * CorrelationProcess.diffusion_coef.dx(rho) +
                 0.5 * (CorrelationProcess.diffusion_coef.value(rho) ** 2) 
                 * CorrelationProcess.diffusion_coef.dxdx(rho)
            
    DiffusionValue = CorrelationProcess.diffusion_coef.value(rho) * CorrelationProcess.diffusion_coef.dx(rho)
    
    return DriftValue, DiffusionValue

In [None]:
def CalculateCorrMatrix():
    raise NotImplementedError

In [57]:
def Jackson(x):
    return x**2 + x + 1

def dxJackson(x):
    return 2*x + 1

def dxdxJackson(x):
    return 2

f = function1d(Jackson, dxJackson, dxdxJackson)