# Imports & Functions

In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm

from pyspark.sql import SparkSession
from azure.storage.blob import ContainerClient

from functions.metrics import nrmse_adjusted
from functions.utils import to_NAN, put_historical_nans
from functions.import_data import import_datasets

from blob_credentials import facts_sas_token, facts_container, workspace_sas_token, workspace_container

from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn import linear_model

import random

In [None]:
def shuffle_df(df):
    """
    Function that shuffle the columns of df
    Input:
        - df: pd.DataFrame
    Output:
        - pd.DataFrame shuffled
    """
    cols = list(df.columns)
    random.seed(0)
    random.shuffle(cols)
    return df[cols]

In [None]:
def order_df(df):
    """
    Function that orders columns based on their value
    Input:
        - df: pd.DataFrame, where columns are originally string representing Integers
    Output:
        - pd.DataFrame where columns are ordered in an ascending order
    """
    cols = list(df.columns)
    cols_int = [int(i) for i in cols]
    cols_int.sort()
    cols_str = [str(i) for i in cols_int]
    return df[cols_str]

In [None]:
def compute_beg_end(df, subset_size, i):
    """
    Function that computes the indices of the columns that should be taken into account
    Input:
        - df: pd.DataFrame
        - subset_size: integer
        - i: integer, step value
    Output:
        - int, int: indices of first and last column
    """
    total_subsets = df.shape[1]//subset_size
    if i == (total_subsets-1): # Last subset
        beg = i*subset_size
        end = df.shape[1]
    else:
        beg = i*subset_size
        end = (i+1)*subset_size
    return beg, end

In [None]:
def compute_total_subsets(df, subset_size):
    """
    Function that computes the number of subsets for a given dataframe
    Input:
        - df: pd.DataFrame
        - subset_size: int
    Output:
        - int, representing the number of subsets the df should be divided into
    """
    if df.shape[1]//subset_size == 0:
        total_subsets = 1
    else:
        total_subsets = df.shape[1]//subset_size
    return total_subsets

In [None]:
def impute_bayesianRidge(df, subset_size=100):
    """
    Function that imputes BayesianRidge based on data subsets of subset_size
    Input:
        - df: pd.DataFrame, dataframe that should be imputed
        - subset_size: integer
    Output:
        - pd.DataFrame imputed
    """
    dfs_imputed = {}
    df = shuffle_df(df)
    total_subsets = compute_total_subsets(df, subset_size)
    for i in tqdm(range(total_subsets)): 
        # Create Subset
        beg, end = compute_beg_end(df, subset_size, i)
        df_sub = df.iloc[:, beg:end]
        cols = df_sub.columns
        
        # Run Imputation
        clf = linear_model.BayesianRidge()
        imputer = IterativeImputer(estimator=clf, n_nearest_features=None, 
                               imputation_order='ascending', random_state=0, verbose=2)
        df_imputed = imputer.fit_transform(df_sub)
        
        # Clean DF
        df_imputed = pd.DataFrame(df_imputed)
        df_imputed.columns = pd.Index(cols)
        dfs_imputed[i] = df_imputed
    
    full_imputed = pd.concat(dfs_imputed.values(), axis=1)
    full_imputed = order_df(full_imputed)
    return full_imputed

# Spark Session

In [None]:
myname = "marc-samvath-philippe.vigneron"

spark = SparkSession \
    .builder \
    .appName(f"Test-{myname}") \
    .config("spark.executor.instance", "1") \
    .config("spark.executor.memory","512m") \
    .config('spark.jars.packages',"org.apache.hadoop:hadoop-azure:3.1.1") \
    .config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
    .config("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
    .config(f"fs.azure.sas.{facts_container}.hecdf.blob.core.windows.net", facts_sas_token) \
    .config(f"fs.azure.sas.{workspace_container}.hecdf.blob.core.windows.net", workspace_sas_token) \
    .getOrCreate()

# Load Data

In [None]:
# Loading the data
generated_data_gbm = spark.read.parquet(f'wasbs://{workspace_container}@hecdf.blob.core.windows.net/{myname}/generated_data_gbm.parquet').toPandas()
generated_data_kde = spark.read.parquet(f'wasbs://{workspace_container}@hecdf.blob.core.windows.net/{myname}/generated_data_kde.parquet').toPandas()
df_challenge = import_datasets()[0]

In [None]:
# Coverting the values to nan
dataset_challenge_gbm_nan = to_NAN(generated_data_gbm, df_challenge.drop(columns=["Date"]))
dataset_challenge_kde_nan = to_NAN(generated_data_kde, df_challenge.drop(columns=["Date"]))

# Iterative imputer

## GBM data

In [None]:
# Imputation
df_imputed_gbm = impute_bayesianRidge(dataset_challenge_gbm_nan)

In [None]:
# Historical NaNs
df_imputed_gbm = put_historical_nans(df_imputed_gbm, dataset_challenge_gbm_nan)

In [None]:
results_gbm = nrmse_adjusted(generated_data_gbm.values, 
                             df_imputed_gbm.values,
                             dataset_challenge_gbm_nan.values)

mean_nrmse_gbm = np.nanmean(np.array(list(i[0] for i in results_gbm.values())))
print("GBM iterative imputer NRMSE: %f" % mean_nrmse_gbm)

## KDE Data

In [None]:
# Imputation
df_imputed_kde = impute_bayesianRidge(dataset_challenge_kde_nan)

In [None]:
# Historical NaNs
df_imputed_kde = put_historical_nans(df_imputed_kde, dataset_challenge_gbm_kde)

In [None]:
results_kde = nrmse_adjusted(generated_data_kde.values, 
                             df_imputed_kde.values,
                             dataset_challenge_kde_nan.values)
mean_nrmse_kde = np.nanmean(np.array(list(i[0] for i in results_kde.values())))
print("kde iterative imputer NRMSE: %f" % mean_nrmse_kde)