# Carregar Dados

In [20]:
import pandas as pd

from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import ClusterCentroids
from imblearn.under_sampling import RandomUnderSampler

SAMPLES_FILE_PATH = "samples.pq"

In [2]:
def get_samples():
    samples = pd.read_parquet(SAMPLES_FILE_PATH)
    
    return samples.reset_index()

# Misc

In [5]:
import os

class PathHandler():
    __path: str = ''
    __value: str = ''
    
    @classmethod
    def generate_path(cls, file_name: str):
        return f'{cls.__path}/{file_name}_{cls.__value}.lz4'
    
    @classmethod
    def get_path(cls):
        return cls.__path
    
    @classmethod
    def set_path(cls, path: str):
        os.makedirs(path, exist_ok=True)
        
        PathHandler.__path = path
        
    @classmethod
    def set_value(cls, value: str):
        cls.__value = value

In [6]:
import time

def measure_execution_time(func):
    def wrapper(*args, **kwargs):
        begin = time.time()
        resultado = func(*args, **kwargs)
        end = time.time()
        print('\n\n' + f'| Tempo de execução de {func.__name__}: {end - begin:.4f} segundos |'.center(200, '-') + '\n\n')
        return resultado
    return wrapper

# Trainamento do Modelo

## Importando Bibliotecas

In [7]:
from pathlib import Path
import multiprocessing
import sys

from scipy.signal import argrelmin
from scipy.stats import uniform, randint

from sklearn import metrics
from sklearn.ensemble import RandomForestClassifier
from sklearn.experimental import enable_halving_search_cv
from sklearn.feature_selection import RFECV, RFE
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import PrecisionRecallDisplay, precision_recall_curve
from sklearn.metrics import ConfusionMatrixDisplay, classification_report
from sklearn.model_selection import cross_val_predict
from sklearn.model_selection import HalvingRandomSearchCV, GroupKFold, KFold
from sklearn.metrics import precision_score, recall_score, f1_score

import multiprocessing

import joblib

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

## Processamento

In [8]:
COVARIATE_START_COLUMN = 'ml_type'
SPATIAL_CROSS_VALIDATION_COLUMN = 'ml_cv_group'

CROSS_VALIDATION_NJOBS, CROSS_VALIDATION_FOLDS = 5, 5

TARGET_COLUMN = 'class'

RANDOM_STATE = 1989

In [9]:
def get_covariates(samples: pd.DataFrame):
    covariates = samples.columns
    
    return samples.columns[np.logical_or.reduce([
        covariates.str.contains('accessibility'),
        covariates.str.contains('blue'),
        covariates.str.contains('bsf'),
        covariates.str.contains('bsi'),
        covariates.str.contains('clm'),
        covariates.str.contains('dtm'),
        covariates.str.contains('evi'),
        covariates.str.contains('fapar'),
        covariates.str.contains('green'),
        covariates.str.contains('ndti'),
        covariates.str.contains('ndvi'),
        covariates.str.contains('ndwi'),
        covariates.str.contains('nir'),
        covariates.str.contains('nirv'),
        covariates.str.contains('red'),
        covariates.str.contains('road.distance_osm.highways.high.density'),
        covariates.str.contains('road.distance_osm.highways.low.density'),
        covariates.str.contains('swir1'),
        covariates.str.contains('swir2'),
        covariates.str.contains('thermal'),
        covariates.str.contains('water.distance_glad.interanual.dynamic.classes'),
        covariates.str.contains('wv_mcd19a2v061')
    ])]

In [10]:
def target_ovo(samples: pd.DataFrame, class_name: str, class_a: list[int], class_b: list[int]):
    remap_dict = {}
    
    remap_dict.update({val: 0 for val in class_a})
    remap_dict.update({val: 1 for val in class_b})
    
    samples[class_name] = samples[TARGET_COLUMN].map(remap_dict)


def create_ovo_class(samples: pd.DataFrame, class_name: list[str], class_values: list[tuple[list[int], list[int]]]):
    class_data = dict(zip(class_name, class_values))
    
    for class_key in class_data:
        value_a = class_data[class_key][0]
        value_b = class_data[class_key][1]
        
        target_ovo(samples, class_key, value_a, value_b)

In [11]:
def get_optimal_threshold(y_true: pd.DataFrame, y_pred):
    precision, recall, threshold = precision_recall_curve(y_true, y_pred)
    
    nonzero_mask = np.logical_and((precision != 0.0), (recall != 0.0))
    
    optimal_idx = np.argmax(1 - np.abs(precision[nonzero_mask] - recall[nonzero_mask]))
    
    return threshold[optimal_idx]

In [19]:
def get_estimator():
    return RandomForestClassifier(n_jobs=-1)

def random_forest_undersampling_random(samples: pd.DataFrame, target_column: str, covariates: list[str]):
    tc_samples = samples[np.logical_not(np.isnan(samples[target_column]))].reset_index()

    X = tc_samples[covariates]
    y = tc_samples[target_column]

    groupKFold = GroupKFold(CROSS_VALIDATION_FOLDS)

    y_true = []
    y_pred_proba = []

    for train_idx, test_idx in groupKFold.split(X, y, tc_samples[SPATIAL_CROSS_VALIDATION_COLUMN]):
        X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

        rus = RandomUnderSampler(random_state=42)
        X_train_res, y_train_res = rus.fit_resample(X_train, y_train)

        estimator = get_estimator()
        estimator.fit(X_train_res, y_train_res)

        y_true.extend(list(y_test))
        y_pred_proba.extend(estimator.predict_proba(X_test)[:,1])

    op_threshold = get_optimal_threshold(y_true, y_pred_proba)

    y_pred = (y_pred_proba >= op_threshold).astype(int)

    joblib.dump({
        'cv_result': pd.DataFrame({
            'predict_proba': y_pred_proba,
            'expected': y.to_numpy(),
        }),
        'threshold': op_threshold,
        'recall': recall_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred),
        'f1_score': f1_score(y_true, y_pred),
        'model': estimator,
    }, PathHandler.generate_path('model'), compress='lz4')

In [None]:
def random_forest_undersampling_centroid(samples: pd.DataFrame, target_column: str, covariates: list[str]):
    tc_samples = samples[np.logical_not(np.isnan(samples[target_column]))].reset_index()

    X = tc_samples[covariates]
    y = tc_samples[target_column]

    groupKFold = GroupKFold(CROSS_VALIDATION_FOLDS)

    y_true = []
    y_pred_proba = []

    for train_idx, test_idx in groupKFold.split(X, y, tc_samples[SPATIAL_CROSS_VALIDATION_COLUMN]):
        print("Começando processamento...")

        X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

        cc = ClusterCentroids(random_state=42)
        X_train_res, y_train_res = cc.fit_resample(X_train, y_train)

        estimator = get_estimator()
        estimator.fit(X_train_res, y_train_res)

        y_true.extend(list(y_test))
        y_pred_proba.extend(estimator.predict_proba(X_test)[:,1])

    op_threshold = get_optimal_threshold(y_true, y_pred_proba)

    y_pred = (y_pred_proba >= op_threshold).astype(int)

    joblib.dump({
        'cv_result': pd.DataFrame({
            'predict_proba': y_pred_proba,
            'expected': y.to_numpy(),
        }),
        'threshold': op_threshold,
        'recall': recall_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred),
        'f1_score': f1_score(y_true, y_pred),
        'model': estimator,
    }, PathHandler.generate_path('model'), compress='lz4')

In [16]:
def random_forest_oversampling(samples: pd.DataFrame, target_column: str, covariates: list[str]):
    tc_samples = samples[np.logical_not(np.isnan(samples[target_column]))].reset_index()

    X = tc_samples[covariates]
    y = tc_samples[target_column]

    groupKFold = GroupKFold(CROSS_VALIDATION_FOLDS)

    y_true = []
    y_pred_proba = []

    for train_idx, test_idx in groupKFold.split(X, y, tc_samples[SPATIAL_CROSS_VALIDATION_COLUMN]):
        X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

        smote = SMOTE(random_state=42)
        X_resampled, y_resampled = smote.fit_resample(X_train, y_train)

        estimator = get_estimator()
        estimator.fit(X_resampled, y_resampled)

        y_true.extend(list(y_test))
        y_pred_proba.extend(estimator.predict_proba(X_test)[:,1])

    op_threshold = get_optimal_threshold(y_true, y_pred_proba)

    y_pred = (y_pred_proba >= op_threshold).astype(int)

    joblib.dump({
        'cv_result': pd.DataFrame({
            'predict_proba': y_pred_proba,
            'expected': y.to_numpy(),
        }),
        'threshold': op_threshold,
        'recall': recall_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred),
        'f1_score': f1_score(y_true, y_pred),
        'model': estimator,
    }, PathHandler.generate_path('model'), compress='lz4')

# Treinamento de Modelos

In [21]:
class_name = ['other_vs_cultivated', 'other_vs_natural']
class_values = [([3], [1]), ([3], [2])]

PathHandler.set_path(f'random_forest/balanced')

samples = get_samples()

covariates = get_covariates(samples)

create_ovo_class(samples, class_name, class_values)

for target_column in class_name:
    PathHandler.set_value("random_" + target_column)
    
    random_forest_undersampling_random(samples, target_column, covariates)

In [22]:
class_name = ['other_vs_cultivated', 'other_vs_natural']
class_values = [([3], [1]), ([3], [2])]

PathHandler.set_path(f'random_forest/balanced')

samples = get_samples()

covariates = get_covariates(samples)

create_ovo_class(samples, class_name, class_values)

for target_column in class_name:
    PathHandler.set_value("centroid_" + target_column)
    
    random_forest_undersampling_centroid(samples, target_column, covariates)

KeyboardInterrupt: 

In [15]:
class_name = ['other_vs_cultivated', 'other_vs_natural']
class_values = [([3], [1]), ([3], [2])]

PathHandler.set_path(f'random_forest/balanced')

samples = get_samples()

covariates = get_covariates(samples)

create_ovo_class(samples, class_name, class_values)

for target_column in class_name:
    PathHandler.set_value("smote_" + target_column)
    
    random_forest_oversampling(samples, target_column, covariates)

# END