In [None]:
import os, sys
import warnings
warnings.filterwarnings('ignore')
from tqdm import tqdm
import seaborn as sns
module_path = './'
if module_path not in sys.path:
    sys.path.append(module_path)
from modules.preprocessing import *
from modules.io import *
from glob import glob
from matplotlib import pyplot as plt
from sklearn.metrics import mean_squared_error
from sklearn.metrics import make_scorer
from tqdm.notebook import tqdm
import seaborn
import pickle
from copy import deepcopy
from timeit import default_timer as timer
from sklearn.ensemble import RandomForestRegressor as RFRegressor
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn import metrics
from sklearn.pipeline import Pipeline
import lightgbm as lgb
plt.style.use('ggplot')
from sklearn.linear_model import QuantileRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score
import time

# Test the performance of forecasting

## Import Dask

In [None]:
from dask.distributed import Client
from dask.distributed import wait
import dask
from dask import delayed
dask.config.set(scheduler='synchronous')

## Start Dask

In [None]:
num_workers = 30
client = Client(n_workers=num_workers,threads_per_worker=1)
client

## Read filenames of datasets (streams)

In [None]:
#read filenames of datasets (streams)
filenames = sorted(glob(os.path.join('/data/data2/forecasting_data/synthetic/','*.csv')))
filenames = filenames[:100] # load 100 for a test

## Read datasets (streams) with associated variables from disk

In [None]:
#read datasets (streams) with associated variables

test_data = []
for f in filenames:
    df = load_df(f)
    
    df = df.set_index('timestamp')
    feats = list(set([x for x in df.columns if 'Grd_Prod_Pwr_min_(t+' not in x]))

    test_data.append(df[feats].values)

## Create streams in memory for the experimental evaluation

In [None]:
#read once, create many copies
filenames = ['/data/data2/forecasting_data/synthetic/forecasting_data0.csv'] # load 1 test file

df = load_df(filenames[0])
df = df.set_index('timestamp')

feats = list(set([x for x in df.columns if 'Grd_Prod_Pwr_min_(t+' not in x]))
temp_stream = df[feats].values

n_streams = 10
n_rows = 2
test_data = np.tile([temp_stream], (n_streams, 1, 1))
for j in range(len(test_data)):
    for i in range(test_data.shape[2]):
        noise =  np.random.normal(0, 0.1, test_data.shape[1])
        test_data[j, :, i] = test_data[j, :, i] + noise
len(test_data[0])     

## Read pretrained model

In [None]:
#read pretrained model
file_path = '/home/ipsarros/pretrained_models/forecasting_regressor_chain2.pickle'
with open(file_path, 'rb') as file:
    loaded_model = pickle.load(file)

## Sequential code

In [None]:
%%time
#sequential

window = 40
i = 0
running_time = []
total_running_time = 0.0

while i< len(test_data[0])-window:
    start = time.time()
    for arr in test_data:
        result = loaded_model.predict(arr[i:i+window])
    end = time.time()
    running_time_temp = end - start
    running_time.append(running_time_temp)
    total_running_time = total_running_time + running_time_temp  
    
    i = i + window
    
if i < len(test_data[0]):
    start = time.time()
    for arr in test_data:
        result = loaded_model.predict(arr[i:len(test_data[0])])
    end = time.time()
    running_time_temp = end - start
    running_time.append(running_time_temp)
    total_running_time = total_running_time + running_time_temp  
    
print ("total_running_time = ", total_running_time )

## Parallel Batch code 

In [None]:
def forecasting_predict (loaded_model,batch_data):
    for batch in batch_data:
        result = loaded_model.predict(batch)

In [None]:
def parallel_batch_processing_futures(loaded_model, batch_data):
    running_time = 0.0
    futures = []
    
    start = time.time()
    for batch in batch_data:
        future = client.submit(forecasting_predict,loaded_model, batch)
        futures.append(future)
    wait(futures, return_when="ALL_COMPLETED")
    end = time.time()
    del futures
    futures = []
    
    running_time = end - start
    
    return running_time
    

In [None]:
%%time

window = 40
i = 0

batch_data_size = len(test_data)//num_workers
batch_data = []
batch_data_all = []
counter = 0;
num_worker = 0

running_time = []
total_running_time = 0.0

while i< len(test_data[0])-window:
    for arr in test_data:
        if (counter < batch_data_size):
            batch_data.append(arr[i:i+window])
            counter = counter + 1
        elif num_worker == num_workers - 1:
            batch_data.append(arr[i:i+window])
        else:
            counter = 0
            batch_data_all.append(batch_data)
            
            batch_data = []
            batch_data.append(arr[i:i+window])
            counter = counter + 1
            num_worker = num_worker + 1
    
    batch_data_all.append(batch_data)
    batch_data = []
      
    num_worker = 0
    counter = 0
    
    running_time_temp = parallel_batch_processing_futures(loaded_model, batch_data_all)
    running_time.append(running_time_temp)
    total_running_time = total_running_time + running_time_temp
    batch_data_all = []

    i = i + window

if i < len(test_data[0]):
    for arr in test_data:
        if (counter < batch_data_size):
            batch_data.append(arr[i:len(test_data[0])])
            counter = counter + 1
        elif num_worker == num_workers - 1:
            batch_data.append(arr[i:len(test_data[0])])
        else:
            counter = 0
            batch_data_all.append(batch_data)
            
            batch_data = []
            batch_data.append(arr[i:len(test_data[0])])
            counter = counter + 1
            num_worker = num_worker + 1
    
    batch_data_all.append(batch_data)
    batch_data = []
      
    num_worker = 0
    counter = 0
    
    running_time_temp = parallel_batch_processing_futures(loaded_model, batch_data_all)
    running_time.append(running_time_temp)
    total_running_time = total_running_time + running_time_temp
    batch_data_all = []

print ("total_running_time = ", total_running_time )

## Experiments


In [None]:

n_streams = 40000, threads = 30:

    window_size = 20:    
        total_running_time =  1004.4421761035919
        CPU times: user 2h 47min 12s, sys: 3min 58s, total: 2h 51min 11s
        Wall time: 16min 44s

    window_size = 40:
        otal_running_time =  752.8725855350494
        CPU times: user 1h 31min 15s, sys: 2min 57s, total: 1h 34min 12s
        Wall time: 12min 33s

    window_size = 60:
        total_running_time =  668.5420389175415
        CPU times: user 1h 4min 57s, sys: 2min 43s, total: 1h 7min 40s
        Wall time: 11min 8s   

    window_size = 80:
        total_running_time =  626.7318153381348
        CPU times: user 53min 17s, sys: 2min 30s, total: 55min 48s
        Wall time: 10min 26s

    window_size = 100:
        total_running_time =  584.7882263660431
        CPU times: user 40min 30s, sys: 2min 24s, total: 42min 55s
        Wall time: 9min 44s
    
      
threads = 30, window_size = 40 :

    n_streams = 20000 :
        total_running_time =  434.73500967025757
        CPU times: user 1h 28min 55s, sys: 1min 42s, total: 1h 30min 37s
        Wall time: 7min 14s

    n_streams = 40000 :
        total_running_time =  752.8725855350494
        CPU times: user 1h 31min 15s, sys: 2min 57s, total: 1h 34min 12s
        Wall time: 12min 33s   

    n_streams = 60000 :
        total_running_time =  1034.7841057777405
        CPU times: user 1h 32min 20s, sys: 4min 10s, total: 1h 36min 30s
        Wall time: 17min 15s   

    n_streams = 80000 :
        total_running_time =  1337.0246572494507
        CPU times: user 1h 34min 2s, sys: 4min 58s, total: 1h 39min 1s
        Wall time: 22min 17s 

    n_streams = 100000 :
        total_running_time =  1653.3374433517456
        CPU times: user 1h 35min 54s, sys: 6min 27s, total: 1h 42min 21s
        Wall time: 27min 34s

    n_streams = 120000 :
        total_running_time =  1994.925544500351
        CPU times: user 1h 36min 28s, sys: 7min 3s, total: 1h 43min 32s
        Wall time: 33min 16s
    
    
n_streams = 60000,  window_size = 40:

    sequential :
        total_running_time =  20934.46885061264
        CPU times: user 5h 48min 25s, sys: 10.5 s, total: 5h 48min 36s
        Wall time: 5h 48min 54s

    threads = 8 :
        total_running_time =  2735.172491312027
        CPU times: user 5min 9s, sys: 3min 30s, total: 8min 40s
        Wall time: 45min 35s

    threads = 16 :
        total_running_time =  1562.24662899971
        CPU times: user 5min 51s, sys: 3min 11s, total: 9min 3s
        Wall time: 26min 2s

    threads = 30 :
        total_running_time =  1034.7841057777405
        CPU times: user 1h 32min 20s, sys: 4min 10s, total: 1h 36min 30s
        Wall time: 17min 15s  

## Close Dask

In [None]:
client.close()