In [1]:
import os
os.environ["OMP_NUM_THREADS"] = "1" # export OMP_NUM_THREADS=1
os.environ["OPENBLAS_NUM_THREADS"] = "1" # export OPENBLAS_NUM_THREADS=1
os.environ["MKL_NUM_THREADS"] = "1" # export MKL_NUM_THREADS=1
os.environ["VECLIB_MAXIMUM_THREADS"] = "1" # export VECLIB_MAXIMUM_THREADS=1
os.environ["NUMEXPR_NUM_THREADS"] = "1" # export NUMEXPR_NUM_THREADS=1
import os, sys
from tqdm import tqdm
#import seaborn as sns
module_path = os.path.abspath(os.path.join('../'))
if module_path not in sys.path:
    sys.path.append(module_path)
    
from modules.preprocessing import *
from modules.io import *
from modules.learning import *
from modules.statistics import *
from matplotlib import pyplot as plt
from sklearn.metrics import mean_squared_error
from IPython.display import display
from sklearn.preprocessing import normalize
#import seaborn
import pickle
from glob import glob
from tqdm.notebook import tqdm
plt.style.use('ggplot')

import time

# Test the performance of the yaw misalignment regression method

## Import Dask

In [2]:
from dask.distributed import Client
from dask.distributed import wait

## Start Dask

In [3]:
client = Client(n_workers=30,threads_per_worker=1)

# Load Data and Preprocessing


In [4]:
filenames = sorted(glob(os.path.join('/data/data1/synthetic_yaw_data','testing_*.csv')))
filenames = filenames[:20000] # load 20000

In [5]:
test_data = []
test_data_bins = []
counter = 0
for f in filenames:
    counter = counter +1 
    dataset_file =  f
    df = load_df(dataset_file)
    df = df.set_index('timestamp')
    df = df.dropna(axis=1, how='all')
    df.columns = df.columns.str.replace('cor. ', '', regex=False)
    cols = ['wind speed', 'pitch angle', 'rotor speed', 'active power',
            'nacelle direction', 'wind direction', 'theta_d']
    df = df[cols]
    df["y"] = np.random.randint(low=0, high=10, size=len(df))
    test_data.append(df.copy())
    

    bin_size = 2
    min_speed = 0
    max_speed = 24
    bins = np.arange(min_speed, max_speed, bin_size)
    bins = np.append(bins,max_speed)
    bin_masks = []
    bin_feature = 'wind speed'
    for i in range(len(bins) - 1):
        mask = (df[bin_feature]>= bins[i]) & (df[bin_feature] < bins[i + 1])
        bin_masks.append(mask)
    test_data_bins.append(bin_masks.copy())

In [6]:
models_file = '/home/dtsitsigkos/more/yaw_windows/deliv_parallel_yaw/Models_Select_Bins1.pickle' 
scaler_file = '/home/dtsitsigkos/more/yaw_windows/deliv_parallel_yaw/select_bins_scaler.pickle'

with open(models_file, 'rb') as file:
    all_models_dict = pickle.load(file)
    
with open(scaler_file, 'rb') as file:
    scaler = pickle.load(file)
    

In [7]:
test_data_scaled = []
for df in test_data:
    df_scaled = df.copy().drop(columns=['y']) 
    df_scaled[df_scaled.columns] = scaler.transform(df_scaled)
    df_scaled['y'] = df['y']
    df['y_pred'] = np.nan
    test_data_scaled.append(df_scaled)

# Test models in new time series

# Sequential Code

In [8]:
def predict_yaw(df, scaler, all_models_dict, bin_masks,df_scaled):
    binned_data_dfs = []

    for b_no, b in enumerate(bin_masks):
        df_temp = df_scaled[bin_masks.iloc[:,b]]
        binned_data_dfs.append(df_temp.copy())
        
    target_feature = 'active power'
    
    all_evaluation_scores = {}
    for dataset_key, dataset_dict in all_models_dict.items():
        all_evaluation_scores[dataset_key] = {}
        fit_features = dataset_dict['selected_features']
        models_dict = dataset_dict['models']
        evaluation_scores = {}
        for key, models in models_dict.items():
            mape_list = []
            for bin_n, d in enumerate(binned_data_dfs):
                if d.shape[0] >= 10 and models[bin_n] is not None:
                    test_preds = predict(d, models[bin_n], fit_features, target_feature)
                    r_sq, mae, me, mape, mpe, Me = score(d[target_feature].values, test_preds)
                    mape_list.append(mape)
                else:
                    pass
            avg_mape = np.mean(mape_list)
            evaluation_scores[key] = avg_mape
        all_evaluation_scores[dataset_key].update(evaluation_scores.copy())
    min_score = float("inf")
    th_s_label = ''
    for dataset, dict1 in all_evaluation_scores.items(): 
        for th_s, evaluation_score in dict1.items(): 
            sc = evaluation_score
            if sc <= min_score:
                min_score = sc
                th_s_label = np.abs(float(th_s))
    indexer = df.index
    df.loc[indexer, 'y_pred'] = th_s_label
    prediction = np.abs(df['y_pred'])
    return prediction

In [9]:
%%time
window = 1440
i = 0

running_time = []
total_running_time = 0.0


while i< len(test_data_scaled[0])-window:
    
    start = time.time()
    for df, b_masks, df_scaled in zip(test_data, test_data_bins, test_data_scaled):
        test_data_bins_temp = pd.concat(b_masks,axis=1)
        test_data_bins_temp.columns = range(12)
        res = predict_yaw(df.iloc[i:i+window].copy(), scaler, all_models_dict, test_data_bins_temp.iloc[i:i+window], df_scaled.iloc[i:i+window].copy())
    end = time.time()
    
    running_time_temp = end - start
    running_time.append(running_time_temp)
    total_running_time = total_running_time + running_time_temp  
    i = i + window
    
print ("total_running_time = ", total_running_time )

total_running_time =  19454.50767159462
CPU times: user 5h 26min 51s, sys: 38min 12s, total: 6h 5min 3s
Wall time: 5h 24min 14s


## Parallel Batch Code

In [10]:
def predict_yaw_batch(df_new, all_models_dict, bin_masks_new,df_scaled_new):
    predictions = []
    
    for df, bin_masks,df_scaled in zip(df_new, bin_masks_new, df_scaled_new):
        binned_data_dfs = []
        for b_no, b in enumerate(bin_masks):
            df_temp = df_scaled[bin_masks.iloc[:,b]]
            binned_data_dfs.append(df_temp.copy())

        target_feature = 'active power'

        all_evaluation_scores = {}
        for dataset_key, dataset_dict in all_models_dict.items():
            all_evaluation_scores[dataset_key] = {}
            fit_features = dataset_dict['selected_features']
            models_dict = dataset_dict['models']
            evaluation_scores = {}
            for key, models in models_dict.items():
                mape_list = []
                for bin_n, d in enumerate(binned_data_dfs):
                    if d.shape[0] >= 10 and models[bin_n] is not None:
                        test_preds = predict(d, models[bin_n], fit_features, target_feature)
                        r_sq, mae, me, mape, mpe, Me = score(d[target_feature].values, test_preds)
                        mape_list.append(mape)
                    else:
                        pass
                avg_mape = np.mean(mape_list)
                evaluation_scores[key] = avg_mape
            all_evaluation_scores[dataset_key].update(evaluation_scores.copy())
        min_score = float("inf")
        th_s_label = ''
        for dataset, dict1 in all_evaluation_scores.items(): 
            for th_s, evaluation_score in dict1.items(): 
                sc = evaluation_score
                if sc <= min_score:
                    min_score = sc
                    th_s_label = np.abs(float(th_s))
        indexer = df.index
        df.loc[indexer, 'y_pred'] = th_s_label
        prediction = np.abs(df['y_pred'])
        predictions.append(prediction)
    return predictions

In [11]:
def parallel_batch_yaw(df, all_models_dict, bin_masks,df_scaled):
    running_time = 0.0
    futures = []
    
    start = time.time()
    for d, b_masks,df_s in zip(df, bin_masks, df_scaled):
        future = client.submit(predict_yaw_batch, d, all_models_dict, b_masks, df_s )
        futures.append(future)
          
    wait(futures, return_when="ALL_COMPLETED")
    
    
    end = time.time()
    futures = []
    running_time = end - start
    
    return running_time

In [12]:
%%time
window = 1440
running_time = []
i = 0

batch_data_size = 334

df_new = []
df_scaled_new = []
test_data_bins_new = []
df_temp = []
df_scaled_temp = []
test_data_bins_temp = []
total_running_time = 0.0
counter = 0


while i< len(test_data_scaled[0])-window:
    for df, b_masks, df_scaled in zip(test_data, test_data_bins, test_data_scaled):
        testDataBins = pd.concat(b_masks,axis=1)
        testDataBins.columns = range(12)
        
        if (counter < batch_data_size):
            df_temp.append(df.iloc[i:i+window])
            df_scaled_temp.append(df_scaled.iloc[i:i+window])
            test_data_bins_temp.append(testDataBins.iloc[i:i+window])
            counter = counter +1 
        else:
            counter = 0
            df_new.append(df_temp)
            df_scaled_new.append(df_scaled_temp)
            test_data_bins_new.append(test_data_bins_temp)
            df_temp = []
            df_scaled_temp=[]
            test_data_bins_temp=[]
            
            df_temp.append(df.iloc[i:i+window])
            df_scaled_temp.append(df_scaled.iloc[i:i+window])
            test_data_bins_temp.append(testDataBins.iloc[i:i+window])
            counter = counter+1
    
    df_new.append(df_temp)
    df_scaled_new.append(df_scaled_temp)
    test_data_bins_new.append(test_data_bins_temp)
    
    running_time_temp = parallel_batch_yaw(df_new.copy(), all_models_dict,test_data_bins_new.copy(), df_scaled_new.copy() )  
    running_time.append(running_time_temp)
    total_running_time = total_running_time + running_time_temp
    
    df_new = []
    df_temp=[]
    test_data_bins_temp=[]
    test_data_bins_new = []
    df_scaled_new = []
    df_scaled_temp = []
    i = i + window
    
print("total_running_time = ", total_running_time)

  ([                     wind speed  pitch angle  ro ...  x 8 columns]])
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good


total_running_time =  1314.8415927886963
CPU times: user 1h 29min 59s, sys: 5min 50s, total: 1h 35min 49s
Wall time: 1h 35min 9s


## Close Dask

In [13]:
client.close()

In [14]:
running_time

[73.26486492156982,
 73.24556159973145,
 65.97510194778442,
 63.51662635803223,
 64.97564697265625,
 62.79744243621826,
 62.473812103271484,
 63.05023241043091,
 58.60575294494629,
 62.41235661506653,
 65.11336970329285,
 66.92150616645813,
 65.38022422790527,
 61.67776679992676,
 66.09904170036316,
 77.47049593925476,
 64.92107725143433,
 63.75164246559143,
 68.83843755722046,
 64.3506326675415]