# Wild Fire Machine Learning (WFML) Utilities

This class is used to gather helper functionalities and common data across all implementations.

In [1]:
import re
import time
import subprocess
import import_ipynb
import numpy as np
import pandas as pd
import sklearn as sk
import matplotlib.pyplot as plt
from os import listdir
from datetime import datetime
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, StratifiedKFold, RandomizedSearchCV
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, roc_curve, roc_auc_score, classification_report, confusion_matrix, ConfusionMatrixDisplay

## Static Data

In [2]:
class wfml:
    path = {
      'root'  : 'C:/Users/gmano/Desktop/HOU/ΠΛΣΔΕ/ΔΕ/2. Υλοποίηση/model/',
      'image' : 'image/',
      'data'  : { 'root'  : 'data/',
                  'tune'  : 'fire_data_tune/',
                  'train' : 'fire_data_train/',
                  'test'  : 'fire_data_test/' }
    }

    filename = {
        'image' : '@@.png',
        'tune'  : 'random_search_@@.csv',
        'train' : 'fire_data_train.csv',
        'test'  : '202007@@_df_greece_norm.csv'
    }

    features = ['dom_vel', 'dom_dir', 'max_temp', 'min_temp',
                'mean_temp', 'rain_7_days', 'ndvi', 'lst_day', 'slope', 'dem',
                'corine_gr1', 'corine_gr4', 'corine_gr5', 'corine_gr21', 'corine_gr22',
                'corine_gr23', 'corine_gr24', 'corine_gr31', 'corine_gr32',
                'corine_gr33']
    
    target = 'fire'

    grouper = 'firedate' #Not the fish :)

## File Utilities

In [3]:
    @staticmethod
    def get_path(path):
        if  path == 'tune' or path == 'train' or path == 'test':
            fullpath = __class__.path['root'] + __class__.path['data']['root'] + __class__.path['data'][path]
        else:
            fullpath = __class__.path['root'] + __class__.path[path]
        return fullpath

    @staticmethod
    def get_filepath(storage, replace = '@@', stamp = False):
        path = __class__.get_path(storage) + __class__.filename[storage]

        if storage == 'tune' or storage == 'image':
            if stamp == True:
                replace = replace + '_' + datetime.now().strftime("%Y%m%d%H%M%S")
            final = path.replace('@@', replace)
        elif storage == 'test':
            final = path.replace('@@', str(replace).zfill(2))
        else:
            final = path
        return final
      
    @staticmethod
    def get_filepaths(storage):
        path = __class__.get_path(storage)
        file = __class__.filename[storage]
        
        filepaths = [path+f for f in listdir(path) if bool(re.search(file.replace('@@', '[0-9][0-9]'), f)) == True]

        return filepaths

## String Utilities

In [4]:
    @staticmethod
    def gini_to_alpha(gini):
        # Gini to Numerical Alpha
        if gini == '':
            alpha = 0
        else:
            alpha = 1 - 2 * float(gini) #less gini = less transparency

        # Scale the float to an integer (0-255)
        if alpha < 0.0:
            alpha = 0.0
        elif alpha > 1.0:
            alpha = 1.0

        # Convert to hexadecimal and ensure it's two digits
        hex_value = format(int(alpha * 255), '02x')

        return hex_value

## ML Utilities
### Cross Validation Method
This method performs the Cross Validation procedure with the RandomizedSearchCV optimizer

In [5]:
    @staticmethod
    def cross_validation(base_model, parameters, kfold, X, y, groups, n_jobs = 2):
        start_time = time.time()
        unique_groups = np.unique(groups)

        # Check if the number of unique groups is less than the number of folds required for cross-validation and
        # raise error if there are not enough unique groups to perform the desired number of splits
        if len(unique_groups) < kfold:
            raise ValueError(f"Number of splits {kfold} > number of unique groups {len(unique_groups)}.")

        # Initialize the StratifiedKFold object for cross-validation
        k = StratifiedKFold(n_splits = kfold, shuffle=False)

        # Define the metrics to be used for scoring the model during cross-validation
        metrics = {'prec_1': make_scorer(precision_score, pos_label=1),
                   'rec_1': make_scorer(recall_score, pos_label=1),
                   'f1_1': make_scorer(f1_score, pos_label=1),
                   'roc': make_scorer(roc_auc_score),
                   'prec_0': make_scorer(precision_score, pos_label=0),
                   'rec_0': make_scorer(recall_score, pos_label=0),
                   'f1_0': make_scorer(f1_score, pos_label=0)}

        # Initialize the RandomizedSearchCV object for hyperparameter tuning and model selection
        optimal_model = RandomizedSearchCV(base_model,
                                           parameters,
                                           scoring = metrics,
                                           n_iter = kfold, #Itterations same as kfold for square cross-validation space
                                           cv = k,
                                           n_jobs = n_jobs,
                                           verbose = 3,
                                           refit = 'rec_1',
                                           return_train_score = True)

        # Fit the model to the data using the specified groups for cross-validation
        optimal_model.fit(X, y, groups=groups)

        stop_time = time.time()
        print("Elapsed Time:", time.strftime("%H:%M:%S", time.gmtime(stop_time - start_time)))
        print("====================")
        print("Best Score: {:.3f}".format(optimal_model.best_score_))
        print("Best Parameters: {}".format(optimal_model.best_params_))
        return optimal_model.best_params_, optimal_model.best_score_, optimal_model.cv_results_

### Hyperparameter Tune Method
This method performs the actual hyperparameter tuning: loads train data, prepares CV attributes, rus CV utility, gathers and saves results.

In [6]:
    @staticmethod
    def hyperparameter_tune(n_jobs, classifier, cv_dimensions, parameters):
        #Read Training File
        df = pd.read_csv(wfml.get_filepath('train'))
        df.firedate = pd.to_datetime(df.firedate)

        # Ensure that groups have multiple unique values
        if df[wfml.grouper].nunique() < 10:
            df[wfml.grouper] = pd.cut(df[wfml.grouper].astype(int) // 10**9, bins=10, labels=False)

        #Define Features & Target Variable
        X = df[wfml.features]
        y = df[wfml.target]

        # Prepare the Split for Cross Validation
        groups = df[wfml.grouper]
        groupskfold = groups.values
        folds = cv_dimensions

        #Initialize arrays to gather results
        best_scores = []
        best_parameters = []
        full_scores = []

        #Define Model
        if classifier == "XGB":
            model = XGBClassifier( n_jobs = n_jobs )
        elif classifier == "RF":
            model = RandomForestClassifier( n_jobs = n_jobs )
        else:
            raise ValueError("Classifier not defined")
        
        #Run for each of the defined k-folds
        for i in folds:
            print("\ncv = ", i)
            start = time.time() #Time Measurement is important here as it can be a long procedure

            try:
                best_params, best_score, full_scores = __class__.cross_validation(model, parameters, i, X, y, groupskfold, n_jobs)
            except ValueError as ve:
                continue #Ignores ValueError for wrong number of kfolds
            
            #Gather and Save Results
            df_results = pd.DataFrame.from_dict(full_scores)
            df_results['folds'] = int(i)
            df_short = df_results.filter(regex="mean|std|params")
            df_results.to_csv(wfml.get_filepath('tune', classifier+i, True), mode='a', header=(i == folds[0]), index = False)

            end_time = time.time()
            print(f"Fold {i} completed in {end_time - start:.2f} seconds.")


## Train Method
This method is used for training the models and display results on training dataset (as validation)

In [7]:
    @staticmethod
    def train(classifier, parameters):
        start = time.time()

        #Read Training File
        df = pd.read_csv(__class__.get_filepath('train'))

        #Define Features & Target Variable
        X = df[wfml.features]
        y = df[wfml.target]

        # Split dataset into training (80%) and testing (20%) sets
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        #Classifier Configuration and Training
        if classifier == "XGB":
            model = XGBClassifier(**parameters)
        elif classifier == "RF":
            model = RandomForestClassifier(**parameters)
        else:
            raise ValueError("Classifier not defined")

        model.fit(X_train, y_train)

        #Initial test with Training dataset
        y_predict = model.predict(X_test)

        #Print Results of Initial Test
        end_time = time.time()
        print("=======================================================")
        print(f"Training Results for {model.__class__.__name__}.\nTime Elapsed: {end_time - start:.2f} seconds.")
        print("=======================================================")
        print(classification_report(y_test, y_predict, target_names=['no-fire', 'fire']))
        print("\n")
        
        #Return Trained Classifier
        return model

## Test Method
This method is used for testing the trained models to actual data (non-weighed and in bigger volume).

Here we have to handle massive records of data (31 files of approximately 500.000 records each), representing the real daily wildfire measurements of July 2020 in Greece. To handle this more effectively the model runs separately for each file and stores results.

In [8]:
    @staticmethod
    def test(model, storage):
        start = time.time()

        #Iniatite storing of Labels & Predictions (multiple due to multiple files)
        labels = []
        predictions = []
        scores = []
        
        filepaths = __class__.get_filepaths(storage)
        
        #Read files & Predict with the trained model
        msg =   "________________________________________________"
        msg += f"Test for {model.__class__.__name__} begun"
        msg +=  "________________________________________________"
        subprocess.run(f"echo {msg}", shell=True) 
        for filepath in filepaths:
            df = pd.read_csv(filepath, index_col=None, header=0)

            X = df[wfml.features]
            y = df[wfml.target]
            
            # Perform Predictions & Scores
            y_predict = model.predict(X)
            y_score = model.predict_proba(X)[:, 1]

            # Gather Labels & Prediction results
            labels.append(y)
            predictions.append(y_predict)
            scores.append(y_score)

            # Make sure code is running :)
            subprocess.run(f"echo {filepath}", shell=True) 

        #Merge arrays from all files of Actuals, Predictions and Scores
        flat_labels = [item for sublist in labels for item in sublist]
        flat_predictions = [item for sublist in predictions for item in sublist]
        flat_scores = [item for sublist in scores for item in sublist]

        #Print Classification Report
        end_time = time.time()
        print("=======================================================")
        print(f"Test Results for {model.__class__.__name__}.\nTime Elapsed: {end_time - start:.2f} seconds.")
        print("=======================================================")
        print(classification_report(flat_labels, flat_predictions, target_names=['no-fire', 'fire']))
        print("\n")

        return { 'labels' : flat_labels,
                 'predictions' : flat_predictions,
                 'scores' : flat_scores }

## ROC Curve Method
Creates and displays diagram for ROC Curve

In [9]:
    @staticmethod
    def roc_curve(model, labels, scores):
        fpr, tpr, thresholds = roc_curve(labels, scores)
        roc_auc = roc_auc_score(labels, scores)

        # Plot ROC curve
        plt.figure()
        plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
        plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver Operating Characteristic (ROC) Curve')
        plt.legend(loc="lower right")

        # Save & Display Graph
        plt.savefig(wfml.get_filepath('image', f'roc_{model.__class__.__name__}', True))
        print("=============================")
        print(f"{model.__class__.__name__}")
        print("=============================")
        plt.show()

## Confusion Matrix
Creates and displays diagram for Confusion Matrix

In [10]:
    @staticmethod
    def confusion_matrix(model, labels, predictions):
        conf_matrix = confusion_matrix(labels, predictions)
        cm_display = ConfusionMatrixDisplay(confusion_matrix = conf_matrix, display_labels = ['No-Fire', 'Fire'])
        cm_display.plot()

        # Save & Display Graph
        plt.savefig(wfml.get_filepath('image', f'cm_{model.__class__.__name__}', True))
        print("=============================")
        print(f"{model.__class__.__name__}")
        print("=============================")
        plt.show()

## Feature Importance
Creates and displays diagram for Feature Importance

In [11]:
    @staticmethod
    def feature_importance(model):
        #Prepare Plot
        plt.figure(figsize=(8, 4))
        plt.barh(__class__.features, model.feature_importances_, color='skyblue')
        plt.xlabel('Gini Importance')
        plt.title('Feature Importance - Gini Importance')
        plt.gca().invert_yaxis()  # Invert y-axis for better visualization
        
        # Save & Display Graph
        plt.savefig(wfml.get_filepath('image', f'fi_{model.__class__.__name__}', True))
        print("=============================")
        print(f"{model.__class__.__name__}")
        print("=============================")
        plt.show()