In [None]:
'''
The DataProcessor class is designed to process data from a CSV file, i.e., data_4_scaler_label_agg.csv. 
The key steps include loading data, fitting preprocessing tools, aggregating features, 
and saving the preprocessing tools and aggregated features as pickle files. 
These preprocessing tools and aggregated features can then be used for data transformation during inference, 
whether in stream-based mode or batch processing mode.

Some examples of the input CSV:
user_id,login_attempts,failed_login_attempts,session_duration,data_transferred,access_sensitive_files,log_text, ... 
1,5,1,30,100,0,User logged in and accessed files., ... 
2,3,0,60,200,1,User attempted to login multiple times., ...  
3,1,0,90,150,0,User session ended abruptly., ...  
1,4,1,45,120,1,User downloaded sensitive files., ...  
2,2,1,60,180,1,User session timed out., ...  
3,3,0,30,160,0,User transferred data to external device., ...  
1,2,1,75,110,0,User session extended., ...
'''

import json
import pandas as pd
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
import joblib
import pickle
import os
import io
import sys 
sys.path.append('/Users/henrychang/sys_security_ai')
from utility import get_logger, config_file_loc, load_config, set_working_directory

# Set up logging configuration
logger = get_logger()

class DataProcessor:
    def __init__(self, input_file_path, # scaler_label_agg_file_path 
                missing_dict_file_path,
                scaler_file_path,
                label_encoders_file_path,
                aggregated_features_file_path):
        """
        Initializes the DataProcessor with the input file.

        Parameters:
            input_file (str): The path to the input CSV file.
            missing_dict_file_path (str): The path to the output pickle file.
            scaler_file_path (str): The path to the output pickle file.
            label_encoders_file_path (str): The path to the output pickle file.
            aggregated_features_file_path (str): The path to the output pickle file.
        """
        self.input_file = input_file_path
        self.missing_dict_file_path = missing_dict_file_path
        self.scaler_file_path = scaler_file_path
        self.label_encoders_file_path = label_encoders_file_path
        self.aggregated_features_file_path = aggregated_features_file_path
        self.df = pd.DataFrame()
        self.missing_dict = {}
        self.scaler = MinMaxScaler()
        self.label_encoders = {}
        self.aggregated_features = {}

    def load_data(self):
        """
        Loads data from the specified CSV file into a DataFrame (self.df).
        """
        try:
            self.df = pd.read_csv(self.input_file)
            logger.info(f"Data loaded from {self.input_file}")
        except FileNotFoundError:
            logger.error(f"Error: File {self.input_file} not found.")
        except pd.errors.EmptyDataError:
            logger.error("Error: No data in the file.")
        except pd.errors.ParserError:
            logger.error("Error: Parsing error.")
        except Exception as e:
            logger.error(f"Error: {e}")

    def prepare_missing_data(self):
        """
        Prepares missing data based on domain knowledge and saves the missing data dictionary.
        """
        try:
            # Filling NaNs based on data types
            for col in self.df.columns:
                if self.df[col].dtype == 'float64' or self.df[col].dtype == 'int64':  # Numeric columns
                    self.missing_dict[col] = self.df[col].mean()  # Replace with mean (or use median, 0, etc.)
                elif self.df[col].dtype == 'object':  # Categorical columns
                    self.missing_dict[col] = self.df[col].mode()[0]  # Replace with mode (most frequent value)
                elif pd.api.types.is_datetime64_any_dtype(self.df[col]):  # Datetime columns
                    self.missing_dict[col] = self.df[col].min()  # Replace with earliest date (or a default date)
                elif self.df[col].dtype == 'bool':  # Boolean columns
                    self.missing_dict[col] = False  # Replace with False (or True, or majority value)
            logger.info("Missing data prepared.")
        except Exception as e:
            logger.error(f"Error during preparing missing data: {e}")

    def fit_scaler(self):
        """
        Fits a MinMaxScaler on the numerical columns of the DataFrame.
        """
        try:
            numerical_cols = self.df.select_dtypes(include=['float64', 'int64']).columns.tolist()
            to_remove = ['user_id']
            numerical_cols = [element for element in numerical_cols if element not in to_remove]
            self.scaler.fit(self.df[numerical_cols])
            logger.info("Scaler fitted on numerical columns.")
        except Exception as e:
            logger.error(f"Error fitting scaler: {e}")

    def fit_label_encoders(self):
        """
        Fits LabelEncoders on the categorical columns of the DataFrame.
        """
        try:
            categorical_cols = self.df.select_dtypes(include=['object']).columns
            for col in categorical_cols:
                le = LabelEncoder()
                le.fit(self.df[col])
                self.label_encoders[col] = le
            logger.info("Label encoders fitted on categorical columns.")
        except Exception as e:
            logger.error(f"Error fitting label encoders: {e}")

    def aggregate_features(self):
        """
        Aggregates features by grouping the data by user_id and performing specified aggregations.
        Saves the resulting dictionary as an attribute.
        """
        try:
            # Group by 'user_id' and perform the aggregations
            aggregated_df = self.df.groupby('user_id').agg(
                login_attempts=('login_attempts', 'mean'),
                failed_login_attempts=('failed_login_attempts', 'mean'),
                session_duration=('session_duration', 'mean'),  # Use 'session_duration': ['mean', 'sum'], if we also need sum.
                data_transferred=('data_transferred', 'mean'),
                access_sensitive_files=('access_sensitive_files', 'mean'),
                count=('user_id', 'size')  # Count the number of occurrences
            ).reset_index()  # Reset Index converts user_id back to a regular column.

            logger.info("Features aggregated")

            # Convert DataFrame to dictionary
            aggregated_dict = aggregated_df.set_index('user_id').T.to_dict('dict')

            # Save the dictionary to an attribute
            self.aggregated_features = aggregated_dict
        except Exception as e:
            logger.error(f"Error during aggregation: {e}")

    def save_preprocessors(self):
        """
        Saves the scaler, label encoders, and aggregated features to .pkl files.
        """
        try:
            joblib.dump(self.missing_dict, self.missing_dict_file_path)
            joblib.dump(self.scaler, self.scaler_file_path)
            joblib.dump(self.label_encoders, self.label_encoders_file_path)
            joblib.dump(self.aggregated_features, self.aggregated_features_file_path)
            # Using triple quotes ensure that the string can span multiple lines.
            logger.info(f"""Scaler, label encoders, and aggregated features saved to
            {self.missing_dict_file_path[:self.missing_dict_file_path.rfind('/') + 1]} directory.""")
        except Exception as e:
            logger.error(f"Error saving processors: {e}")

    def run(self):
        """
        Runs the data processing pipeline: loads data, prepares missing data,
        fits preprocessing tools, aggregates features, and saves the preprocessors.
        """
        self.load_data()
        if not self.df.empty:
            self.prepare_missing_data()
            self.fit_scaler()
            self.fit_label_encoders()
            self.aggregate_features()
            self.save_preprocessors()
        else:
            logger.info("No data to process.")

# Example usage
if __name__ == "__main__":
    try:
        # Load configuration
        # config_file_loc = '/Users/henrychang/sys_two_ai/config/config.json'
        config = load_config(config_file_loc)
        
        if config:
            # Set desired_directory as working_directory
            desired_directory = config.get('desired_directory')
            working_directory = set_working_directory(desired_directory)
            
            if working_directory:
                #  # Get input directory of files
                input_dir = config.get('input_dir')
                input_data_path = os.path.join(working_directory, input_dir)
                
                # Get path of input data for scaler_label_agg
                scaler_label_agg_file = config.get('scaler_label_agg_file')
                scaler_label_agg_file_path = os.path.join(input_data_path, scaler_label_agg_file)

                # Get output directory of files
                output_dir = config.get('output_dir')
                output_data_path = os.path.join(working_directory, output_dir)

                # Get path of output dictionary for missing_dict
                missing_dict_file = config.get('missing_dict')
                missing_dict_file_path = os.path.join(output_data_path, missing_dict_file)

                # Get path of output scaler pickle file
                scaler_file = config.get('scaler')
                scaler_file_path = os.path.join(output_data_path, scaler_file)

                # Get path of output label_encoders pickle file
                label_encoders_file = config.get('label_encoders')
                label_encoders_file_path = os.path.join(output_data_path, label_encoders_file)

                # Get path of output aggregated_features pickle file
                aggregated_features_file = config.get('aggregated_features')
                aggregated_features_file_path = os.path.join(output_data_path, aggregated_features_file)
                
                preprocessor = DataProcessor(scaler_label_agg_file_path, 
                                             missing_dict_file_path,
                                             scaler_file_path,
                                             label_encoders_file_path,
                                             aggregated_features_file_path
                                            )
                                             
                preprocessor.run()
    except Exception as e:
        logger.error(f"Unexpected error in main execution: {e}")
