In [1]:
import random

from distributed_processing.serializers import JsonSerializer
from distributed_processing.client import Client
from distributed_processing.redis_connector import RedisConnector


In [2]:
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_DB = 0
NAMESPACE = "montecarlos"

In [3]:
redis_connector = RedisConnector(redis_host=REDIS_HOST, redis_port=REDIS_PORT, 
                                 redis_db=REDIS_DB, namespace=NAMESPACE)

client = Client(JsonSerializer(), redis_connector, check_registry="cache")

Client with id: montecarlos:redis_client:9
Results queue: montecarlos:redis_client:9:responses


In [4]:
import datetime

NSIMULATIONS = 1000000

V=0.20 # Volatilidad
R=0.04 # Interés instantáneo (ln(1+r)?)
COUPON_BARRIER=0.8
KICKOUT_BARRIER=1.1
PROTECTION_BARRIER=0.6
COUPON_RATE=0.088

obs_dates = [datetime.datetime(2012, 7, 4, 0, 0),
             datetime.datetime(2013, 7, 5, 0, 0),
             datetime.datetime(2014, 7, 7, 0, 0),
             datetime.datetime(2015, 7, 6, 0, 0),
             datetime.datetime(2016, 7, 5, 0, 0),
             datetime.datetime(2017, 7, 5, 0, 0),
             datetime.datetime(2018, 7, 5, 0, 0)]

dates = [int((t-obs_dates[0]).days) for t in obs_dates]

In [5]:
# Versión basada en RandomState
# RandomState se ha quedado anticuado y utilza el generador MT19937
# que es más lento que el generador ahora por defecto en numpy PGC64 


nchunks = 32
nsims = float(NSIMULATIONS) / float(nchunks)

def chunks(nsims, nchunks):
    chunks = [int(nsims)] * nchunks
    diff = NSIMULATIONS - int(nsims) * nchunks
    if diff>0:
        for i in range(diff):
            chunks[i] += 1
    return chunks


def args_kwargs(*args, **kwargs):
    return list(args), kwargs



def args_kwargs1(nsimulations):
    return args_kwargs(nsimulations=nsimulations,
                        v=V,
                        r=R, 
                        coupon_barrier=COUPON_BARRIER,
                        kickout_barrier=KICKOUT_BARRIER,
                        protection_barrier=PROTECTION_BARRIER,
                        coupon_rate=COUPON_RATE,
                        dates=dates)

all_args = [args_kwargs1(x) for x in chunks(int(nsims), nchunks)]


def distributed_montecarlo():
    sim = [client.rpc_async("mc_autocall_mp", x[0], x[1]) for x in all_args]
    sumas, sizes = list(zip(*[x.get() for x in sim]))
    
    return sum(sumas)/sum(sizes)
    

In [6]:
%%timeit
distributed_montecarlo()

383 ms ± 7.66 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [7]:
distributed_montecarlo()

1045.164092327497

In [8]:
sim = [client.rpc_async("mc_autocall_mp", x[0], x[1]) for x in all_args]
sim

[<distributed_processing.async_result.AsyncResult at 0x22b35582508>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582a88>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582248>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582b08>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582108>,
 <distributed_processing.async_result.AsyncResult at 0x22b353fc0c8>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582548>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582648>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582ac8>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582208>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582b48>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582148>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582f48>,
 <distributed_processing.async_result.AsyncResult at 0x22b35582e08>,
 <distributed_processing.async_res

In [9]:
[x.get() for x in sim]

[[32663261.19957483, 31250.0],
 [32663284.79660122, 31250.0],
 [32658327.76219603, 31250.0],
 [32663340.01168442, 31250.0],
 [32663384.391243167, 31250.0],
 [32662760.513942026, 31250.0],
 [32663304.535863586, 31250.0],
 [32654766.955722306, 31250.0],
 [32663439.42789009, 31250.0],
 [32649178.096680276, 31250.0],
 [32663389.817204557, 31250.0],
 [32661669.83242957, 31250.0],
 [32663084.561150983, 31250.0],
 [32657697.024199724, 31250.0],
 [32663678.14559345, 31250.0],
 [32663432.560359452, 31250.0],
 [32663339.436650176, 31250.0],
 [32662171.395375535, 31250.0],
 [32663148.55562962, 31250.0],
 [32663347.872152347, 31250.0],
 [32663126.942310024, 31250.0],
 [32663304.535863586, 31250.0],
 [32663476.48978208, 31250.0],
 [32663261.19957483, 31250.0],
 [32663688.196366616, 31250.0],
 [32663347.872152347, 31250.0],
 [32662460.436137293, 31250.0],
 [32663347.872152347, 31250.0],
 [32663511.983489677, 31250.0],
 [32662715.28435214, 31250.0],
 [32663261.19957483, 31250.0],
 [32661990.906662367

In [10]:
sumas, sizes = list(zip(*[x.get() for x in sim]))
sum(sumas)/sum(sizes)

1045.1874998105616

In [11]:
# Versión basada en PGC64. Hay que pasar a la función la semilla y el número de salto.
# Basado en https://numpy.org/doc/1.18/reference/random/parallel.html

import secrets


def args_kwargs2(x):
    seed = int(x[0])
    jump = int(x[1])
    nsim = int(x[2])
    return args_kwargs(rndg=(seed, jump),
                        nsimulations=nsim,
                        v=V,
                        r=R, 
                        coupon_barrier=COUPON_BARRIER,
                        kickout_barrier=KICKOUT_BARRIER,
                        protection_barrier=PROTECTION_BARRIER,
                        coupon_rate=COUPON_RATE,
                        dates=dates)



seed = secrets.randbits(128)

rnd_args = [x for x in zip([seed]*nchunks, range(nchunks),  chunks(int(nsims), nchunks))]

all_args = [args_kwargs2(x) for x in rnd_args]

def distributed_montecarlo2():
    sim = [client.rpc_async("mc_autocall_mp2", x[0], x[1]) for x in all_args]
    sumas, sizes = list(zip(*[x.get() for x in sim]))
    
    return sum(sumas)/sum(sizes)
    

In [12]:
%%timeit
distributed_montecarlo2()

374 ms ± 7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [13]:
distributed_montecarlo2()

1045.1756680159149

In [24]:
# Versión basada en PGC64. Hay que pasar a la función la semilla y el número de salto.
# Basado en https://numpy.org/doc/1.18/reference/random/parallel.html

import secrets

nchunks = 32
nsims = float(NSIMULATIONS) / float(nchunks)

def args_kwargs2(x):
    seed = int(x[0])
    jump = int(x[1])
    nsim = int(x[2])
    return args_kwargs(rndg=(seed, jump),
                        nsimulations=nsim,
                        v=V,
                        r=R, 
                        coupon_barrier=COUPON_BARRIER,
                        kickout_barrier=KICKOUT_BARRIER,
                        protection_barrier=PROTECTION_BARRIER,
                        coupon_rate=COUPON_RATE,
                        dates=dates)



seed = secrets.randbits(128)

rnd_args = [x for x in zip([seed]*nchunks, range(nchunks),  chunks(int(nsims), nchunks))]

all_args = [args_kwargs2(x) for x in rnd_args]

def distributed_empty_montecarlo():
    sim = [client.rpc_async("mc_autocall_empty", x[0], x[1]) for x in all_args]
    sumas, sizes = list(zip(*[x.get() for x in sim]))
    
    return sum(sumas)/sum(sizes)
    

In [25]:
%%timeit
distributed_empty_montecarlo()

17.8 ms ± 1 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
