In [1]:
from joblib import Parallel, delayed
import numpy as np
import random
from line_profiler import LineProfiler
from numba import jit
import time

In [2]:
def timerDetail(f, *x):
    """test the time consumption"""
    lp = LineProfiler()
    lp_wrapper = lp(f)
    lp_wrapper(*x)
    lp.print_stats()


def timer(f, *x):
    """test the time consumption"""
    start = time.time()
    test1 = f(*x)
    print(time.time() - start)

In [3]:
@jit(nopython=True)
def St_cal(num_steps, K, dLambda, sigma, S0):
    St = S0
    for k in range(num_steps):
        dNormal = float(random.normalvariate(0, 1 / num_steps))
        power_term = float(pow(St, dLambda))
        St = St + sigma * power_term * dNormal
    return max(St - K, 0)

In [4]:
class PriceMc:

    def __init__(self, num_data, num_simulations, num_steps, S0):
        self.num_data = num_data
        self.num_simulations = num_simulations
        self.num_steps = num_steps
        self.steps_vec = np.full((self.num_simulations,), self.num_steps)
        self.S0 = S0
        self.Ks = np.random.uniform(0.8, 1.2, (self.num_data,))
        self.dLambdas = np.random.uniform(1 / 2, 3 / 2, (self.num_data,))
        self.sigmas = np.random.uniform(1 / 10, 1 / 2, (self.num_data,))
        self.price_vec = self.price_gen()

    def mc_simulation(self, K, dLambda, sigma):
        st_cal_vec = np.vectorize(St_cal)
        return np.sum(st_cal_vec(self.steps_vec, K, dLambda, sigma, self.S0)) / self.num_simulations

    def price_gen(self):
        return Parallel(n_jobs=5)(delayed(self.mc_simulation)
                                  (self.Ks[i], self.dLambdas[i], self.sigmas[i])
                                  for i in range(self.num_data))

In [5]:
def test_for(num_data, num_simulations, num_steps, S0):
    data_MC = []
    for i in range(num_data):
        K = float(random.uniform(0.8, 1.2))
        dLambda = float(random.uniform(1 / 2, 3 / 2))
        sigma = float(random.uniform(1 / 10, 1 / 2))
        P = 0
        for h in range(num_simulations):
            St = S0
            for k in range(num_steps):
                dNormal = float(random.normalvariate(0, 1 / num_steps))
                power_term = float(pow(St, dLambda))
                # if type(power_term) == complex:
                #     continue
                St = St + sigma * power_term * dNormal  # St+1 = St + r * St * dt + sigma * St**lambda * n
            P += max(St - K, 0) / num_simulations
        data_MC.append([P, K, dLambda, sigma])
    return data_MC

In [6]:
timer(test_for, *[10000, 1000, 100, 1])

1164.1034698486328


In [8]:
timer(PriceMc, *[10000, 1000, 100, 1])

13.027849435806274
