In [None]:
import re
import glob
import os
import csv
from helpers import config
from helpers.loading import load_daily_data ,file_exist,get_all_dates
from helpers.algorithm import find_best_delay
import pandas as pd
import time
from multiprocessing.pool import ThreadPool
import dask
import dask.dataframe as dd
from helpers.dask import *

%load_ext autoreload
%autoreload 2
print(f"working on signal : {config['signal']}")

# Loading dates to process

In [None]:
all_dates = get_all_dates(config["signal"],config["stock"])
N = len(all_dates)
print(f"{N} dates to process")

# Setting up dask

In [None]:
k = 5 # number of partitions
t = N//k # number of dates to process per worker
dask.config.set(scheduler="processes")
#dask.config.set(pool=ThreadPool(k))

# Processing function

In [None]:
fieldnames = ['date', 'market1','market2',"lag"]
signal = config["signal"]
@dask.delayed
def compute_lags(preprocessing_steps,start_date_idx,end_date_idx,max_iterations = 500,verbose=0):
    
    # file where to write the computed lags
    results_path = config["files"]["results"][signal]["dask_calculation"]["all_best_lags"]\
                    .format("_".join(preprocessing_steps)+f"{start_date_idx}_{end_date_idx}")

    result_file_exists = file_exist(results_path) 
    csvfile = open(results_path, 'a', newline='') 
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    
    if result_file_exists:
        processed_dates = set(pd.read_csv(results_path).date.unique())
    else: 
        # if the file is new, we need to write headers
        writer.writeheader()
        processed_dates = set()

    start_time = time.time()
    date_count = 0 # number of dates processed
    dates_to_process = all_dates[start_date_idx:end_date_idx]
    print(len(dates_to_process))
    for date_id,date in enumerate(dates_to_process):
        if verbose>0:
            print(f"date:{date}, {date_id}:{len(dates_to_process)}, {100*date_id/len(dates_to_process):0.3f}%", end="\r")
            
        ###########################################################
        try : 
            daily_data = load_daily_data(date,preprocessing_steps=preprocessing_steps)
        except : 
            continue
        ###########################################################
        if not daily_data:
            # in case all markets do not provide data for the given date, we skip the date
            continue

        # we skip the current date if it has already been processed
        if date in processed_dates:
            continue

        for i,n1 in enumerate(daily_data):
            for j,n2 in enumerate(daily_data):
                if i>j: # avoid symetric (corr(a,b)=corr(b,a)) and meaningless (corr(a,a)=1) calculations
                    best_delay, delays, correlations, los, his = find_best_delay(daily_data,n1,n2,step_size=1000)
                    # write the computed result
                    writer.writerow({'date': date, 'market1': n1,'market2': n2,'lag': best_delay})
                    writer.writerow({'date': date, 'market1': n2,'market2': n1,'lag': -best_delay})
        csvfile.flush() # flush every time we processed a date
        date_count+=1
        if date_count>=max_iterations:
            break
    print()
    if verbose>0:
        print(f"{date_count} dates processed in {time.time()-start_time:0.2f}s")
    csvfile.close()
    
    return results_path

# Processing using dask

In [None]:
def compute_lags_dask(preprocessing_steps):
    promises = []
    for start_date_idx in range(0,N,t):
        end_date_idx = start_date_idx+t
        promise = compute_lags(preprocessing_steps,start_date_idx,end_date_idx)
        promises.append(promise)
    
    final_result_path = config["files"]["results"][signal]["all_best_lags"]\
                    .format("_".join(preprocessing_steps))
    dask_computation(promises,final_result_path)

In [None]:
preprocessing_steps = ["numeric","log_returns"]

In [None]:
compute_lags_dask(preprocessing_steps)