$$\large \text{Packages & Specs} $$

In [1]:
import os
import pandas as pd
import numpy as np
import re
import threading
import queue
import sys

# Kalman Smoothing using R objects
import rpy2.robjects as robjects
# import R packages
from rpy2.robjects.packages import importr

# Impute TS
imputeTS = importr('imputeTS')
kalman_StructTs = robjects.r['na_kalman']

module_path = re.sub(r'Notebooks|Python Scripts','Python Scripts',os.getcwd())
sys.path.append(module_path)

from pv_modules import *
from tensorflow import keras

meteo_datapath = re.sub(r'Notebooks|Python Scripts','Support Files/',os.getcwd())
# == Load Hourly Meteo Data == #
meteo_file = 'hour_all_meteo_data_clean'
meteo_df = pd.read_csv(meteo_datapath + f'{meteo_file}.csv',index_col=0)
meteo_df.index = pd.to_datetime(meteo_df.index)
# == Load Feature Data == #
feature_file = 'all_meteo_data_clean_4D_4Y'
features_df = pd.read_csv(meteo_datapath + f'{feature_file}.csv',index_col=0)
features_df.index = pd.to_datetime(features_df.index)

model_datapath = re.sub(r'Notebooks|Python Scripts','Python Scripts/',os.getcwd())
nn_model = keras.models.load_model(model_datapath + f"multivariate_mlp_model")

2023-06-13 18:26:09.142008: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-06-13 18:26:37.723587: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
def interpolation_method(df, nan_gaps):
    
    """
    Performs interpolation on a DataFrame to fill missing values using the 'time' method.

    Args:
        df (pandas.DataFrame): Input DataFrame.
        nan_gaps (dict): Dictionary containing column names as keys and lists of NaN gap indices as values.

    Returns:
        pandas.DataFrame: DataFrame with filled values using time based linear interpolation.
        
    """
    
    output_df = df.copy()
        
    df = df.interpolate(method='time', limit_direction='both')
            
    for col in nan_gaps.keys():
        output_df[col].iloc[nan_gaps[col]] = df[col].iloc[nan_gaps[col]]
        
    return output_df

In [3]:
def ARIMA(df, nan_gaps):
    
    """
    Applies Kalman filtering to a DataFrame to fill missing values.

    Args:
        df (pandas.DataFrame): Input DataFrame.
        nan_gaps (dict): Dictionary containing column names as keys and lists of NaN gap indices as values.

    Returns:
        pandas.DataFrame: DataFrame with filled values using Kalman filtering.

    """
    
    output_df = df.copy()
    
    for col in df.columns:
        
        arr = np.ndarray.tolist(df[col].values)
        arr = robjects.FloatVector(arr)

        df[col] = kalman_StructTs(arr, model = "auto.arima")
        
    for col in nan_gaps.keys():
        output_df[col].iloc[nan_gaps[col]] = df[col].iloc[nan_gaps[col]]
        
    return output_df

In [4]:
def NN_Regression(df):

    nn_features = ml_df.loc[df.index[0]:df.index[-1],:]
    nn_features = nn_features[nn_features.index.isin(df.index)]

    X = nn_features.iloc[test_gaps[df.columns[0]]].to_numpy()
        
    scaler = MinMaxScaler()

    X_scaled = scaler.fit_transform(X)

    imputation_data = model.predict(X_scaled)

    df.iloc[test_gaps[df.columns[0]]] = imputation_data

    error_df = calculate_imputation_errors(df, df_copy, test_gaps)
    
    return error_df

In [5]:
def fill_hour(df):
    """
    Fill missing values in the df DataFrame using hourly data from the Grigy Site.
    
    Args:
        test_df (pandas.DataFrame): The DataFrame containing the test data.
        file (str): The type of file for which missing values need to be filled. 
                    Valid options are "Irradiance" or any other file type.
    
    Returns:
        tuple: A modified df DataFrame with the missing hour values.
    """

    fill_df = meteo_df.loc[df.index[0]:df.index[-1],:]
    fill_df = fill_df[fill_df.index.isin(df.index)]

    if file == "Irradiance":
        df['DirectIR'] = df['DirectIR'].fillna(fill_df['Direct Shortwave Radiation (W/m²) (sfc)'])
        df['DiffuseIR'] = df['DiffuseIR'].fillna(fill_df['Diffuse Shortwave Radiation (W/m²) (sfc)'])
        df['Temperature'] = df['Temperature'].fillna(fill_df['Temperature (°C) (2 m elevation corrected)'])
        df['WindSpeed'] = df['WindSpeed'].fillna(fill_df['Wind Speed (m/s) (10 m)'])
    else:
        print(f"No available data to fill hourly gaps for file type: {file}")
        
    return df

In [6]:
def df_imputer(df):
    
    """
    Finds the index positions of gaps in a DataFrame based on their size and applies appropriate imputation method.

    Args:
        df (pandas.DataFrame): Input DataFrame.

    Returns:
        pandas.DataFrame: DataFrame with filled missing values using interpolation and Kalman filtering.

    """
    
    interpolation = {}
    arima = {}
    
    for col in range(len(df.columns)):
        index_list = []
        interpolation[df.columns[col]] = []
        arima[df.columns[col]] = []
        for index in range(len(df.index)):
            if index in index_list: continue
            day = df.index[index].day
            c = 0
            while np.isnan(df.iloc[index+c,col]) and df.index[index+c].day == day:
                if df.index[index+c] == df.index[-1]: break
                index_list += [index+c]           
                c += 1 
            dt = (df.index[index+c] - df.index[index]).total_seconds()
            if not c and not np.isnan(df.iloc[index+c,col]): continue
                
            # == uncomment block if removing night time values prior to imputing missing values == #
#             elif df.index[index+c] == df.index[-1] and np.isnan(df.iloc[index+c,col]):
#                 if dt <= 200:
#                     interpolation[df.columns[col]] += list(range(index,index+c+1))
#                 else:
#                     arima[df.columns[col]] += list(range(index,index+c+1))
#             elif df.index[index+c+1].day != df.index[index+c].day and np.isnan(df.iloc[index+c+1,col]):
#                 arima[df.columns[col]] += list(range(index,index+c+1))
#             elif df.index[index-1].day != df.index[index].day and np.isnan(df.iloc[index,col]):
#                 arima[df.columns[col]] += list(range(index,index+c+1))
            # == uncomment block if removing night time values prior to imputing missing values == #
    
            elif dt <= 200:
                interpolation[df.columns[col]] += list(range(index,index+c+1))
            else:
                arima[df.columns[col]] += list(range(index,index+c+1))

        if not interpolation[df.columns[col]]:
            del interpolation[df.columns[col]]
        if not arima[df.columns[col]]:
            del arima[df.columns[col]]
            
    if interpolation:
        df = interpolation_method(df,interpolation)
    if arima:
        df = ARIMA(df,arima)
        
    return df

$$\large \text{Imputer; instance of df for cleaning and preprocessing} $$

In [7]:
class Imputer():
    
    """
    Class for data imputation and cleaning.
    
    Attributes:
        df (pandas.DataFrame): Input DataFrame.
        month (str): Month.
        year (str): Year.
        file (str): File type.

    Methods:
        run(): Runs the data imputation and cleaning process.
    """
    
    def __init__(self,df,month,year,file):
        self.df = df
        self.month = month
        self.year = year
        self.file = file
        super().__init__()
        
    def run(self):
                
        # === reshaping df for timestap & adjusted headers === #
        self.df = reshape_df(self.df,self.file)
        
        if self.file == 'Irradiance':
            
            # === Set Column Names === #
            self.df.columns = ['GlobalIR','DirectIR','DiffuseIR','WindSpeed','Temperature']
            
            # === Removing Misread Vemps === #
            self.df = clean_irradiance_values(self.df)
            
        else:
            
            # === Set Column Names === #
            self.df.columns = ['MonoSi_Vin','MonoSi_Iin','MonoSi_Vout','MonoSi_Iout','PolySi_Vin','PolySi_Iin','PolySi_Vout','PolySi_Iout','TFSi_a_Vin','TFSi_a_Iin','TFSi_a_Vout','TFSi_a_Iout','TFcigs_Vin','TFcigs_Iin','TFcigs_Vout','TFcigs_Iout','TempF_Mono','TempF_Poly','TempF_Amor','TempF_Cigs']
        
            # === Removing Misread Values === #
            self.df = clean_deger_fixed_values(self.df)
            
        # === resample df to 20s frequency === #
        self.df = resample_df(self.df)
        
        # === Fill hourly gaps with Meteo Data === #
        self.df = fill_hour(self.df)
        
        # === Using PvLib to remove nightime values === #
#         self.df = remove_night(self.df)
            
        print(f"Imputing {round(self.df.isna().sum().sum()/self.df.size*100,3)}% of the data for {self.month}, {self.year}.")

        self.df = df_imputer(self.df)
                
        if self.df.isna().any().any():   
            raise Exception(f"The File {self.file}, {self.month} {self.year} still has NaN values")
            
        cwd = re.sub("Notebooks|Python Scripts","Data/",os.getcwd())
        datapath = cwd + self.year + '/' + self.file + '/'
        file = self.month.lower() + '.csv'
        self.df.to_csv(datapath + "/clean_" + file)

In [8]:
class Worker(threading.Thread):
    
    """
    Thread worker class for parallel processing.
    
    Attributes:
        queue (Queue): Queue containing file paths.
        file (str): File type.
        lock (threading.Lock): Lock for thread synchronization.
    """
    
    def __init__(self, queue, file, lock):
        threading.Thread.__init__(self)
        self.queue = queue
        self.file = file
        self.lock = lock

    def run(self):
        while True:
            try:
                file_path = self.queue.get(timeout=3) # retrieve file path from the queue
            except queue.Empty:
                return # If the queue is empty, exit the thread
            
            data = re.search(r"/(\d{4})/[a-zA-Z]*/([a-zA-Z]*)\.csv",file_path).group(1,2)
            df = pd.read_csv(file_path, sep="\t|,", engine='python')
            self.lock.acquire()
            print('Starting',data[1], data[0])
            self.lock.release()
            Imputer(df, data[1], data[0], self.file).run()
            self.lock.acquire()
            print('Completed',data[1], data[0])
            self.lock.release()
            self.queue.task_done() # Notify the queue that the task is done

In [None]:
starttime = pd.Timestamp.now()
        
q = queue.Queue()

file = input("File (opt: Irradiance/Deger/Fixed): ")
             
file_paths, _ = get_file_paths(file) 

for file_path in file_paths:
#     q.put_nowait(file_path)
    # = run for specific month and year, or for test file = #
    if re.search(r'nov',file_path.lower()) and re.search(r'2022',file_path): 
        q.put_nowait(file_path)
    
lock = threading.Lock()

num_jobs = 8

for _ in range(num_jobs): 
    t = Worker(q, file, lock)
    t.daemon = True
    t.start()

q.join() 

endtime = pd.Timestamp.now()

runtime = endtime - starttime

print("Start:",starttime,"\nEnd:",endtime,"\nRun Time:",runtime.total_seconds())

File (opt: Irradiance/Deger/Fixed): Irradiance
Starting november 2022


Exception in thread Thread-5:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/pv-solar/lib/python3.10/site-packages/pandas/core/indexes/base.py", line 3802, in get_loc
    return self._engine.get_loc(casted_key)
  File "pandas/_libs/index.pyx", line 138, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 165, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5745, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5753, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'Wind Speed (m/s) (10 m)'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/anaconda3/envs/pv-solar/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/var/folders/pl/3z04c3v55nz5t091_s4wsj1m0000gn/T/ipykernel_50363/297414507.py", line 30, in run
  File "/var/fol