In [1]:
import os
import pandas as pd

In [2]:
def find_csv(path='/data/notebook_files/'):
    df_name = ''
    for _, _, arquivos in os.walk(path):
        for arquivo in arquivos:
            if arquivo.endswith('.csv'):
                df_name = arquivo
    return df_name

def open_dataframe(**params):
    if params is None:
        df = pd.read_csv(find_csv(), thousands=',', decimal='.', engine='c')
    else:
        df = pd.read_csv(find_csv(), **params)
    return df

In [3]:
class ETLBasic:
    def __init__(self, df_name, df):
        self.df_name = df_name
        self.df = df
        self.columns = self.df.columns.values.tolist()
        self.statistics_numerics_df = {}
        self.statistics_categoricals_df = {}
    
    def df_header(self, head=10):
        return self.df.head(head)
    
    def get_df(self):
        return self.df
    
    def format_date(self, format_date='%d/%m/%Y', column=None):
        self.df[column] = pd.to_datetime(self.df[column])
        self.df[column] = self.df[column].dt.strftime(format_date)
        return self.get_df()
    
    def convert_to_numeric(self, columns=None):
        if columns is None:
            print('Colunas não fornecidas: \n\t\t -> Colunas: ', columns)
        else:
            try:
                for column in columns:
                    self.df[column] = self.df[column].str.replace(r'[,]', '', regex=True)
                    self.df[column] = pd.to_numeric(self.df[column],errors='coerce')
            except Exception as err:
                print(err)
            else:
                return self.get_df()

    def date_as_index(self, column):
        self.df.set_index(column, inplace=True)
        return self.get_df()
    
    def df_between_time(self, start_time, end_time):
        return self.df.between_time(start_time=start_time, end_time=end_time)
    
    def describe_df(self, include=None, exclude=None, percentiles=None):
        return self.df.describe(include=include, exclude=exclude, percentiles=percentiles)

    def info_df(self):
        return self.df.info()
    
    def show_informations_df(self):
        print(self.info_df())
        self.describe_df()
    
    def get_statistics_numerics_df(self):
        columns = self.describe_df(include=float).columns.to_list()
        for column in columns:
            self.statistics_numerics_df[column] = {
                    'count': self.df[column].count(),
                    'mean': self.df[column].mean(),
                    'median': self.df[column].median(),
                    'variance': self.df[column].var(),
                    'std': self.df[column].std(),
                    'mode': self.df[column].mode().values[0],
                    'max': self.df[column].max(),
                    'min': self.df[column].min(),
                    'duplicates': self.df[column].duplicated(keep=False).sum(),
                    'NA_values': self.df[column].isnull().sum(),
                    'Have_NA_values': self.df[column].isnull().any(),
                    '%_NA_values': f'{round((self.df[column].isnull().sum() / self.df.shape[0]), 2) * 100}%',
                    'zeros_count': (self.df[column] == 0).sum()
            }

        df_statistics_numerics = pd.DataFrame(self.statistics_numerics_df)

        return df_statistics_numerics
    
    def get_statistics_categoricals_df(self):
        columns = self.describe_df(include=object).columns.to_list()

        for column in columns:
            self.statistics_categoricals_df[column] = {
                'mode': self.df[column].mode.values[0],
                'count': self.df[column].count(),
                'NA_values': self.df[column].isnull().sum(),
                'Have_NA_values': self.df[column].isnull().any(),
                '%_NA_values': f'{round((self.df[column].isnull().sum() / self.df.shape[0]), 2) * 100}%',
                'duplicates': self.df[column].duplicated(keep = False).count()
            }

        df_statistics_categoricals = pd.DataFrame(self.statistics_numerics_df)

        return df_statistics_categoricals

    def sort_values_duplicated(self, column):
        df_duplicates = self.df[column][self.df[column].duplicated() == True].sort_values()
        return df_duplicates
    
    def drop_duplicates(self, subset=None):
        return self.df.drop_duplicates(subset=subset, keep=False, inplace=True)
    
    def dropna_df(self, axis=0, subset=None, thresh=None, how='any'):
        return self.df.dropna(axis=axis, inplace=True, subset=subset, thresh=thresh, how=how)
    
    def fillna(self, value, axis=0):
        return self.df.fillna(value=value, axis=axis, inplace=True)
    
    def eval_df(self, exp):
        return self.df.eval(exp, inplace=True)
    
    def df_value_counts(self, columns=None, subset=None, normalize=False, sort=True, ascending=False, dropna=True):
        if columns is None:
            return self.df.value_counts(subset=subset, normalize=normalize, sort=sort, ascending=ascending, dropna=dropna)
        else:
            return self.df[columns].value_counts(subset=subset, normalize=normalize, sort=sort, ascending=ascending, dropna=dropna)

In [4]:
class Model:    
    def __init__(self, model, data, column_target):
        self.model = model
        self.x = data.drop(column_target, axis=1)
        self.y = data[column_target]
        self.x_train, self.x_test, self.y_train, self.y_test = None, None, None, None
        self.y_train_pred, self.y_test_pred = None, None
        self.train_accuracy, self.test_accuracy = None, None
    
    def scale_x(self):
        from sklearn.preprocessing import MinMaxScaler

        scaler = MinMaxScaler()
        x_new = pd.DataFrame(scaler.fit_transform(self.x), index=self.x.index, columns=self.x.columns)
        return x_new
    
    def find_best_config_model(self, task='classification', estimator_list=['lgbm']):
        from flaml import AutoML

        automl = AutoML()
        automl.fit(x_train=self.x_train, y_train=self.y_train, task=task, estimator_list=estimator_list)
        best_params = automl.best_config
        self.model = self.model(**best_params)
        return self.model, best_params
    
    def train_test_split_data(self, test_size=0.3, train_size=0.7, random_state=None, shuffle=True, stratify=None):
        from sklearn.model_selection import train_test_split

        self.x_train, self.x_test, self.y_train, self.y_test = train_test_split(self.scale_x(), self.y, test_size=test_size, train_size=train_size, random_state=random_state, shuffle=shuffle, stratify=stratify)
        return self.x_train, self.x_test, self.y_train, self.y_test

    def fit_model(self, eval_metric='l1', early_stopping_rounds=1000):
        self.model.fit(self.x_train, self.y_train, eval_set=[(self.x_test, self.y_test)], eval_metric=eval_metric, early_stopping_rounds=early_stopping_rounds)
    
    def predict(self):
        self.y_train_pred = self.model.predict(self.x_train)
        self.y_test_pred = self.model.predict(self.x_test)
        return self.y_train_pred, self.y_test_pred

    def accuracy_model(self):
        from sklearn.metrics import accuracy_score

        self.train_accuracy = accuracy_score(self.y_train, self.y_train_pred)
        self.test_accuracy = accuracy_score(self.y_test, self.y_test_pred)
        print(f'Acurácia no conjunto de treinamento: {self.train_accuracy:.4f}')
        print(f'Acurácia no conjunto de validação: {self.test_accuracy:.4f}')
        return self.train_accuracy, self.test_accuracy
    
    def show_confusion_matrix(self):
        import matplotlib.pyplot as plt
        from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
        
        confusion_matrix = confusion_matrix(self.y_test, self.y_test_pred, labels=self.model.classes_)
        display = ConfusionMatrixDisplay(confusion_matrix=confusion_matrix, display_labels=self.model.classes_)
        display.plot()
        plt.show()
    
    def exist_overfitting(self, cv=5):
        import numpy as np
        from sklearn.model_selection import cross_val_score

        cv_scores = cross_val_score(self.model, self.x_train, self.y_train, cv=cv)
        mean_cv_score = np.mean(cv_scores)
        print(f'Acurácia média na validação cruzada: {mean_cv_score:.4f}')
        if self.test_accuracy >= mean_cv_score:
            print('Não houve evidências de overfitting.')
        else:
            print('Houve evidências de overfitting.')
    
    def execute_model(self, eval_metric, early_stopping_rounds):
        self.fit_model(eval_metric, early_stopping_rounds)
        self.predict()
        self.accuracy_model()
        self.show_confusion_matrix()
        self.exist_overfitting()


In [7]:
df_name = find_csv()
df = open_dataframe()

etl = ETLBasic(df_name=df_name, df=df)
etl.df_header()