In [1]:
#This script will be used to perform the data cleaning and preparation tasks required for unsupervised learning

#import necessary packages
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler

# Import relavent pyscripts:
import EDA_part2  # I'd suggest importing specific functions from EDA_part2.
                  # from EDA_part2 import foo
                  # I don't know what is in EDA_part2 and it could have extra functions that would just slow the program down

# Added type hints to all the functions, super easy way to hint
def transform_data(input_df: pd.DataFrame, remove_cols: list[str], cat_cols: list[str], y_col: list[str], stop_after_data_split: bool = False) -> pd.DataFrame:
    """
    This function prepares the dataframe for model training by cleaning the dataframe, standardizing quantitative inputs, and one-
    Hot encoding the categorical data

    INPUTS:
    input_df: a pandas dataframe object of the combined charging data
    remove_cols: a list of columns to remove from the dataset (not relavent to our study)
    cat_cols: a list of string entries detailing which columns are categorical and should be one-hot encoded
    y_col: a string entry title for the column that we're performing unsupervised learning on

    OUTPUTS:
    output_df: a pandas dataframe object that is scaled and one-hot encoded with the desired features
    """

    # First remove columns unwanted:
    temp_df = input_df.drop(columns=remove_cols)

    # Make subset df that does not include the target column(s)
    if isinstance(y_col, list):
        features_df = temp_df.drop(columns=y_col)
        target_df = temp_df[y_col]
    else:
        features_df = temp_df.drop(columns=[y_col])
        target_df = temp_df[[y_col]]
    # This might be better as a try/except if you're worried about generating a fail
    # If you just don't want a series I'd suggest converting `y_col` to a list
    # def force_variable_to_list(x: list | str) -> list:
    #     return x if isinstance(x, list) else [x]

    # Separate test data from train/val set:
    X_train, X_test, y_train, y_test = train_test_split(features_df,target_df, test_size=0.2, random_state=42)

    if stop_after_data_split:
        return X_train, X_test, y_train, y_test

    # Find which columns are quantitative by deduction
    all_cols = list(features_df.columns)
    quant_cols = [col for col in all_cols if col not in cat_cols]  # I've been using num_col or num_features to be consistent with
                                                                   # the industry standards

    # Set the encoder & scaler
    encoder = OneHotEncoder(sparse_output=False, drop=None)
    scaler = StandardScaler()

    # Fit and transform the columns appropriately
    X_train_scaled_columns = scaler.fit_transform(X_train[quant_cols])
    X_train_encoded_columns = encoder.fit_transform(X_train[cat_cols])
    X_test_scaled_columns = scaler.transform(X_test[quant_cols])
    X_test_encoded_columns = encoder.transform(X_test[cat_cols])

    # Now map back to a scaled, encoded dataframe:
    X_train_scaled_df = pd.DataFrame(X_train_scaled_columns, columns=quant_cols)
    X_train_encoded_df = pd.DataFrame(X_train_encoded_columns, columns=encoder.get_feature_names_out(cat_cols))
    X_test_scaled_df = pd.DataFrame(X_test_scaled_columns, columns=quant_cols)
    X_test_encoded_df = pd.DataFrame(X_test_encoded_columns, columns=encoder.get_feature_names_out(cat_cols))

    # Combine the processed data
    X_train_scale_encoded_df = pd.concat([X_train_encoded_df, X_train_scaled_df], axis=1)
    X_test_scale_encoded_df = pd.concat([X_test_encoded_df, X_test_scaled_df], axis=1)

    return(X_train_scale_encoded_df, X_test_scale_encoded_df, y_train, y_test)


def process_datetime(input_df: pd.DataFrame) -> pd.DataFrame:
    """
    Takes an input_dataframe for the charging events and parses the datetime column to extract the characteristic information from it
    such as day, month, year, day of the week, day of the year, hour, etc.

    INPUTS:
    input_df: a pandas dataframe object

    OUTPUS:
    out_df: a pandas dataframe with added columns for the specific date information
    """

    # Given the start_datetime, extract key features
    input_df["start_datetime"] = pd.to_datetime(input_df["start_datetime"])
    input_df["year"] = input_df["start_datetime"].dt.year.astype(int)
    input_df["month"] = input_df["start_datetime"].dt.month.astype(int)
    input_df["day"] = input_df["start_datetime"].dt.day.astype(int)
    input_df["hour"] = input_df["start_datetime"].dt.hour.astype(int)
    input_df["minute"] = input_df["start_datetime"].dt.minute.astype(int)
    input_df["second"] = input_df["start_datetime"].dt.second.astype(int)
    input_df["time_of_day"] = input_df["hour"] + input_df["minute"]/60 + input_df["second"]/3600
    input_df["weekday"] = input_df["start_datetime"].dt.weekday.astype(int)
    input_df["day_of_year"] = input_df["start_datetime"].dt.dayofyear.astype(int)

    # Now convert each of these to numeric rather than datetime:
    # Is there something here?

    # Why not just return input_df or change input_df to df and return that?
    out_df = input_df

    return out_df


def anomaly_tags(input_df: pd.DataFrame, anomaly_list: list[int] = None) -> pd.DataFrame:
    """
    Converts entries from the input_df's "flag_id" column to 0 or 1 based on anomaly prescence
    INPUTS:
    input_df: a pandas dataframe with the flag_id column that needs to be updated
    anomaly_list: a list of integer values associated with the flag_id entry code

    OUTPUTS:
    out_df: a pandas dataframe with the updated flag_id column
    """

    ########### What is this doing?
    if anomaly_list == None:
        input_df["flag_id"] = input_df["flag_id"].map(lambda x: 1 if x != 0 else 0)
    else:
        input_df["flag_id"] = input_df["flag_id"].map(lambda x: 1 if x in anomaly_list else 0)

    # Same thing: why rename it?
    out_df = input_df

    return out_df


def main_execution(input_condition: int = 2, test_ratio: float = 0.2, anomaly_list: list[int] = None) -> np.array:
    """
    This function runs the main execution to process the data into train, val, and test datasets
    INPUTS:
    input_condition: An integer value (0, 1, or 2) specifying which condition to apply to combine the data
                     0: do nothing, 1: drop nulls, 2: drops impractical values (used for supervised learning)
    test_ratio: a float value between 0 and 1 determining how much of the input data will be the test dataset
    anomaly_list: a list of integer values associated with the flag_id entry code

    OUTPUTS:
    X_train, X_val, & X_test:
    Y_train, y_val, & y_test:
    """

    # Get the dataframe processed
    merged_df = EDA_part2.combine_charging_data(input_condition=input_condition)
    merged_time_df = process_datetime(merged_df)
    mapped_df = anomaly_tags(merged_time_df, anomaly_list=anomaly_list)
    mapped_df = mapped_df.dropna()

    # Declare the target, categorical, and unwanted columns:
    y_col = "flag_id"
    cat_cols = ["power_kw", "connector_type", "pricing", "region", "land_use", "metro_area", "charge_level", "venue"]
    remove_cols = ["session_id", "connector_id_x", "evse_id", "connector_id_y", "start_datetime", "end_datetime",
                   "hour", "minute", "second"]

    # Get the train, val, and test split of data
    # EDITED HERE @SZYMON
    X_train, X_test, y_train, y_test = transform_data(mapped_df, remove_cols=remove_cols, cat_cols=cat_cols, y_col=y_col, stop_after_data_split=True)

    # Output to datafile for future calling if not already in cwd:
    dir_contents = os.listdir()
    outdict = {"UL_Xtrain.csv": X_train, "UL_Xtest.csv": X_test, "UL_ytrain.csv": y_train, "UL_ytest.csv": y_test}
    for k,v in outdict.items():
        if k not in dir_contents:
            v.to_csv(k, index=False)  # Neat way to save this!

    return X_train, X_test, y_train, y_test  # No need for the parenthesis, python already returns as a tuple


if __name__ == "__main__":
    X_train, X_test, y_train, y_test = main_execution()