In [2]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
from sklearn.impute import SimpleImputer
from sklearn.tree import export_graphviz
import pydot

In [4]:
class RandomForestModel:
    def __init__(self, data, targets):
        """
        Initialize the RandomForestModel.

        Parameters:
        - data: The dataset for modeling.
        - targets: The target variables to predict.
        """
        self.data = data
        self.targets = targets
        self.model = None
        self.wmape_output = None
        self.best_params = None
        self.best_score = None
        self.fit = None
        self.wmape = None
        self.pred_wmape = None
        self.pred = None

    def parameters(self):
        """
        Get all the instance variables in a dictionary format.

        Returns:
        - params_dict: A dictionary with instance variable names as keys and their values as values.
        """
        params_dict = {
            "targets": self.targets,
            "features": self.features,
            "model": self.model,
            "wmape_output": self.wmape_output,
            "best_params": self.best_params,
            "best_score": self.best_score,
            "fit": self.fit,
            "pred": self.pred,
            "graph": self.graph
        }
        return params_dict

    def update_targets(self, targets):
        """
        Update the target variables for modeling.

        Parameters:
        - targets: The new target variables to set.
        """
        self.targets = targets

    def update_data(self, data):
        """
        Update the dataset for modeling.

        Parameters:
        - data: The new dataset to set.
        """
        self.data = data

    def clean_data(self):
        for df in [self.train_df, self.test_df]:
            df['date'] = pd.to_datetime(df['date'])
            df['hour'] = df['date'].dt.hour
            df['day_of_week'] = df['date'].dt.dayofweek
            df['day_of_year'] = df['date'].dt.dayofyear
            df['month'] = df['date'].dt.month
            df.drop('date', axis=1, inplace=True)
        return self.train_df, self.test_df

    def prepare_data(self,exempt=[]):
        """
        Prepare the datasets for modeling.

        Parameters:
        - targets: The target variables to prepare the data for.

        Returns:
        - train_df, test_df: The prepared training and testing datasets.
        """
        for target in self.targets:

            for df in [self.train_df, self.test_df]:

                # Shift by Date Cycles
                df[f'{target[:2]}_next_hour'] = df[target].shift(-1)
                df[f'{target[:2]}_next_day'] = df[target].rolling(window=24).sum()
                df[f'{target[:2]}_next_weekday'] = df[target].rolling(window=7 * 24).sum()
                df[f'{target[:2]}_next_month'] = df[target].rolling(window=30 * 24).sum()

                # Lag by Date Cycles
                df[f'{target[:2]}_inverse_hour'] = df[target].shift(1)
                df[f'{target[:2]}_inverse_next_day'] = df[target].diff(24)
                df[f'{target[:2]}_inverse_next_weekday'] = df[target].diff(7 * 24)
                df[f'{target[:2]}_inverse_next_month'] = df[target].diff(30 * 24)

                df.dropna(inplace=True)

                # Rolling Mean by Date Cycles
                df[f"{target[:2]}_6hour_mean"] = df[target].rolling(6).mean()
                df[f"{target[:2]}_12hour_mean"] = df[target].rolling(12).mean()
                df[f"{target[:2]}_24hour_mean"] = df[target].rolling(24).mean()
                df[f"{target[:2]}_week_mean"] = df[target].rolling(24*7).mean()
                df[f"{target[:2]}_30day_mean"] = df[target].rolling(24*30).mean()

                # Rolling Min by Date Cycles
                df[f"{target[:2]}_6hour_min"] = df[target].rolling(6).min()
                df[f"{target[:2]}_12hour_min"] = df[target].rolling(12).min()
                df[f"{target[:2]}_24hour_min"] = df[target].rolling(24).min()
                df[f"{target[:2]}_week_min"] = df[target].rolling(24*7).min()
                df[f"{target[:2]}_30day_min"] = df[target].rolling(24*30).min()

                # Rolling Max by Date Cycles
                df[f"{target[:2]}_6hour_max"] = df[target].rolling(6).max()
                df[f"{target[:2]}_12hour_max"] = df[target].rolling(12).max()
                df[f"{target[:2]}_24hour_max"] = df[target].rolling(24).max()
                df[f"{target[:2]}_week_max"] = df[target].rolling(24*7).max()
                df[f"{target[:2]}_30day_max"] = df[target].rolling(24*30).max()

                # Rolling Standard Deviation by Date Cycles
                df[f"{target[:2]}_6hour_std"] = df[target].rolling(6).std()
                df[f"{target[:2]}_12hour_std"] = df[target].rolling(12).std()
                df[f"{target[:2]}_24hour_std"] = df[target].rolling(24).std()
                df[f"{target[:2]}_week_std"] = df[target].rolling(24*7).std()
                df[f"{target[:2]}_30day_std"] = df[target].rolling(24*30).std()

        self.features = [feature for feature in self.train_df.columns if feature not in self.targets or exempt]

        return self.train_df, self.test_df, self.features
    
    def calculate_wae_rmse(self, reference, predictions):
        """
        Calculate the Weighted Absolute Error (WAE), Root Mean Square Error (RMSE),
        and their respective accuracies for a single target.

        Parameters:
        - reference: The reference dataset containing true values.
        - predictions: The predicted values to compare to.

        Returns:
        - error_metrics: Dictionary containing WAE, RMSE, and their accuracies for the specified target.
        """
        error_metrics = {}
    
        # Convert DataFrame or Series to NumPy arrays
        y_true = reference.values
        y_pred = predictions.values if isinstance(predictions, pd.Series) else predictions

        # Calculate Weighted Absolute Error (WAE)
        wae = np.sum(np.abs(y_true - y_pred))
        error_metrics['wae'] = wae

        # Calculate Root Mean Square Error (RMSE)
        rmse = np.sqrt(np.mean((y_true - y_pred) ** 2))
        error_metrics['rmse'] = rmse

        # Calculate the accuracy for WAE (WAE Accuracy)
        wae_accuracy = 100 - (wae / np.sum(np.abs(y_true)) * 100)
        error_metrics['wae_accuracy'] = wae_accuracy

        # Calculate the accuracy for RMSE (RMSE Accuracy)
        rmse_accuracy = 100 - (rmse / np.std(y_true) * 100)
        error_metrics['rmse_accuracy'] = rmse_accuracy

        self.error_metrics = error_metrics

        return error_metrics

    def tune_random_forest_hyperparameters(self, X, y):
        """
        Tune the hyperparameters of a Random Forest model using GridSearchCV.

        Parameters:
        - X: Input features.
        - y: Target variable.

        Returns:
        - best_params: The best hyperparameters found by GridSearchCV.
        - best_score: The best score achieved with the best hyperparameters.
        """
        param_grid = {
            'n_estimators': [50, 100, 150],
            'max_depth': [None, 10, 20, 30],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4],
            'max_features': ['auto', 'sqrt', 'log2']
        }

        rf = RandomForestRegressor(random_state=0, n_jobs=6)
        grid_search = GridSearchCV(estimator=rf, param_grid=param_grid, cv=5, n_jobs=-1, verbose=2)
        grid_search.fit(X, y)

        self.best_params = grid_search.best_params_
        self.best_score = grid_search.best_score_

        return self.best_params, self.best_score

    def fit_model(self):
        """
        Fit a RandomForestRegressor model to the training dataset.

        Returns:
        - output: A dictionary containing model-related information.
        """
        train_df = self.train_df.copy()

        imputer = SimpleImputer()
        self.Xtr = imputer.fit_transform(train_df[self.features])
        self.ytr = train_df[self.targets]

        best_params, best_score = self.tune_random_forest_hyperparameters(self.Xtr, self.ytr)

        mdl = RandomForestRegressor(n_estimators=best_params.get('n_estimators', 100),
                                    max_depth=best_params.get('max_depth', None),
                                    min_samples_split=best_params.get('min_samples_split', 2),
                                    min_samples_leaf=best_params.get('min_samples_leaf', 1),
                                    max_features=best_params.get('max_features', 'auto'),
                                    random_state=best_params.get('random_state', 0),
                                    n_jobs=best_params.get('n_jobs', 6))

        mdl.fit(self.Xtr, self.ytr)

        self.model = mdl
        self.params = best_params
        self.score = best_score

        return self.parameters()

    def predict(self):
        """
        Predict target values for the testing dataset and compare results.

        Returns:
        - pred: Predicted values for each target column.
        """
        test_df = self.test_df.copy()

        # Fit the imputer on your training data first
        imputer = SimpleImputer()
        imputer.fit(self.train_df[self.features])

        # Transform the test data using the fitted imputer
        Xtest = imputer.transform(test_df[self.features])

        # Make predictions
        pred = self.model.predict(Xtest)

        self.pred = pred
        return self.pred

    def print_performance(self, target, sig_level=0.05):
        important_features = [feature for feature in self.features if self.importance.get(feature, 0) > sig_level]
        df = self.train_df.copy().dropna(subset=self.features).fillna(0)
        important_df = df[important_features]
        target_df = df[target]

        # Train the random forest
        rf_most_important = RandomForestRegressor(n_estimators=1000, random_state=42)
        rf_most_important.fit(important_df, target_df)

        # Prepare the test data using the same important features
        test_important_df = df[important_features].fillna(0)

        # Make predictions
        predictions = rf_most_important.predict(test_important_df)

        print(predictions)

        # Calculate WAE, RMSE, and their respective accuracies
        error_metrics = self.calculate_wae_rmse(target_df, target)

        # Display the performance metrics
        print('Mean Absolute Error (WAE):', round(error_metrics['wae'], 4))
        print('WAE Accuracy:', round(error_metrics['wae_accuracy'], 4), '%.')

        print('Root Mean Square Error (RMSE):', round(error_metrics['rmse'], 4))
        print('RMSE Accuracy:', round(error_metrics['rmse_accuracy'], 4), '%.')

    def visualize_tree(self, tree_index, dot_loc, png_loc=None, new_params=None):
        """
        Visualize a decision tree from the random forest model.

        Args:
            tree_index (int): Index of the tree to visualize.
            dot_loc (str): File path to save the DOT file.
            png_loc (str, optional): File path to save the visualization as a PNG file.

        Returns:
        - graph: A Pydot graph representing the decision tree.
        """
        if new_params is not None:

            tree_model = RandomForestRegressor(
                n_estimators=new_params.get('n_estimators', self.params.get('n_estimators', 100)),
                max_depth=new_params.get('max_depth', self.params.get('max_depth', None)),
                min_samples_split=new_params.get('min_samples_split', self.params.get('min_samples_split', 2)),
                min_samples_leaf=new_params.get('min_samples_leaf', self.params.get('min_samples_leaf', 1)),
                max_features=new_params.get('max_features', self.params.get('max_features', 'auto')),
                random_state=new_params.get('random_state', self.params.get('random_state', 0)),
                n_jobs=new_params.get('n_jobs', self.params.get('n_jobs', 6))
            )

            tree_model.fit(self.Xtr, self.ytr)

        else:
            tree_model = self.model

        if tree_index < 0 or tree_index >= len(tree_model.estimators_):
            raise ValueError(f"Invalid tree_index. It should be in the range [0, {len(tree_model.estimators_)-1}].")
        
        tree = tree_model.estimators_[tree_index]
        export_graphviz(
            tree,
            out_file=dot_loc,
            feature_names=self.features,
            rounded=True,
            precision=1
        )
        (self.graph, ) = pydot.graph_from_dot_file(dot_loc)

        if png_loc is not None:
            self.graph.write_png(png_loc)

        return self.graph

    def print_importances(self, decimal_places=4):

        importances = {}
        
        feature_importances = [(feature, round(importance, decimal_places)) for feature, importance in zip(self.features, self.model.feature_importances_)]
        feature_importances = sorted(feature_importances, key=lambda x: x[1], reverse=True)

        for feature, importance in feature_importances:
            importances[feature] = importance
            print(f'Variable: {feature:20} Importance: {importance:.{decimal_places}f}')

        self.importance = importances