<a href="https://colab.research.google.com/github/gabriellybc/previsao-evasao-ufes/blob/main/previsao_evasao_ufes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title # Previsão de Alunos com Risco de Evasão - UFES { display-mode: "form" }
#@markdown Este notebook utiliza um modelo de aprendizado de máquina **XGBoost** para prever alunos com risco de evasão da Universidade Federal do Espírito Santo (UFES), a partir de dados do histórico acadêmico importados.
#@markdown ## Como funciona este notebook

#@markdown ---

#@markdown 1. **Importação dos dados**

#@markdown Ao executar a célula para carregar os dados, aparecerá um botão **"Escolher arquivos"** para que você selecione as bases de histórico dos alunos.
#@markdown - Você pode selecionar quantos arquivos quiser.
#@markdown - **Importante:** Os arquivos devem conter obrigatoriamente as seguintes colunas: `MATR_ALUNO`, `ANO_INGRESSO`, `FORMA_INGRESSO`, `COTISTA`, `CRA`, `COD_FORMA_EVASAO`, `SITUACAO_ITEM`, `ANO`, `ANO_EVASAO`, `ANO_INGRESSO`

#@markdown 2. **Pré-processamento dos dados**

#@markdown Após a importação, os dados serão pré-processados para ficarem no formato adequado para a previsão pelo modelo.
#@markdown O pré-processamento inclui a criação das seguintes colunas que serão usadas na previsão: `TEMPO_GRADUACAO`, `REPROVADA_POR_CURSADA_ANO1`, `REPROVOU_Nunca`,`REPROVOU_FREQUENCIA_Nunca`

#@markdown 3. **Colunas referentes às disciplinas do departamento**

#@markdown O processo é o seguinte:
#@markdown - Para cada linha, identifica-se o departamento da disciplina.
#@markdown - Os dados são agrupados e ordenados para identificar os **4 departamentos com maior número de registros** na base.
#@markdown - Apenas esses 4 departamentos são selecionados; os demais são ignorados.
#@markdown - Para cada aluno, verifica-se a porcentagem de valores nulos para esses departamentos (valores nulos indicam que o aluno nunca cursou disciplinas daquele departamento).
#@markdown - São selecionados apenas os departamentos cuja porcentagem de nulos está abaixo de um limite (padrão: 1.6%).
#@markdown - Ao final, o notebook imprime quais departamentos foram selecionados ou informa se nenhum departamento foi selecionado.
#@markdown - Além disso, são removitos todas as linhas com alunos que tiverem valor nulo em alguma dessas colunas usadas na previsão.

#@markdown ---

#@markdown ## Parâmetros de entrada

#@markdown Você deve definir os seguintes parâmetros para rodar o notebook:

#@markdown - **URL_PASTA_DRIVE**: Link da pasta do Google Drive onde o modelo (`model.pkl`) e o arquivo de configuração das colunas (`config_features.json`) serão salvos ou lidos.
#@markdown  - No modo `treinar_e_prever`, este campo é opcional. Se preenchido, o modelo treinado e as colunas selecionadas serão salvos nessa pasta. Se deixado vazio, o modelo e as colunas não serão salvos.
#@markdown  - No modo `apenas_prever`, este campo é obrigatório e deve conter uma pasta com os arquivos `model.pkl` e `config_features.json` para que o notebook possa carregar o modelo e as colunas.

#@markdown - **MODO**: Define se o notebook vai:
#@markdown  - `treinar_e_prever`: Treinar um novo modelo com os dados importados e fazer a previsão.
#@markdown  - `apenas_prever`: Apenas fazer a previsão usando um modelo previamente treinado e salvo na pasta do Drive.

#@markdown ---

#@markdown ## Observações importantes

#@markdown - Cada vez que você rodar o modo `treinar_e_prever` com o campo `URL_PASTA_DRIVE` preenchido, os `arquivos model.pkl` e `config_features.json` serão salvos na pasta indicada.
#@markdown - Na pasta do Drive, podem existir múltiplos arquivos com esses nomes, referentes a diferentes execuções.
#@markdown - No modo `apenas_prever`, o notebook irá carregar os arquivos `model.pkl` e `config_features.json` com a última data de modificação mais recente para garantir que o modelo mais atualizado seja usado.

#@markdown ---

#@markdown ## Entradas:

URL_PASTA_DRIVE = "https://drive.google.com/drive/folders/12R6swOXmMTAGJsMEHfkJlfIoAxvxezQ2" #@param {type:"string"}
MODO = "treinar_e_prever" # @param ["treinar_e_prever","apenas_prever"]

#@markdown ---



from google.colab import drive, auth, files
import google.auth
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
import warnings
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, Any
import io
import gc
import json
import pickle
from mimetypes import guess_type
from collections import Counter
from scipy import stats
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.model_selection import GridSearchCV, RepeatedStratifiedKFold, cross_validate


auth.authenticate_user()
warnings.filterwarnings("ignore")
pd.set_option("display.max_columns", 999)
credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/drive"]
)
service_drive = build("drive", "v3", credentials=credentials)


MAIN_COLUMNS = [
    "MATR_ALUNO",
    "ANO_INGRESSO",
    "FORMA_INGRESSO",
    "COTISTA",
]
COL_X = [
    "CRA",
    "TEMPO_GRADUACAO",
    "REPROVADA_POR_CURSADA_ANO1",
    "REPROVOU_Nunca",
    "REPROVOU_FREQUENCIA_Nunca"
]
col_y = "GRUP_FORMA_EVASAO_Evadido"

MODEL_NAME = "model.pkl"
JSON_NAME = "config_features.json"
QNT_MAX_DEPARTMENTS = 4
MAX_NULL_PERCENT = 1.6
OUTPUT_FILENAME = "previsao_evasao_estudantes.xlsx"

# Data Loading
def upload_files() -> pd.DataFrame:
    """
    Uploads and combines multiple CSV and Excel files into a single pandas DataFrame.
    """
    list_of_dfs = []
    uploaded = files.upload()
    if not uploaded:
        raise ValueError("Nenhum arquivo foi carregado.")
    for filename in uploaded.keys():
        if filename.endswith(".csv"):
            df = pd.read_csv(filename)
        elif filename.endswith(".xlsx"):
            df = pd.read_excel(filename)
        list_of_dfs.append(df)
    if not list_of_dfs:
        raise ValueError("Nenhum arquivo válido foi carregado.")
    combined_df = pd.concat(list_of_dfs, ignore_index=True)
    del list_of_dfs
    del uploaded
    gc.collect()
    return combined_df


def convert_types(df: pd.DataFrame) -> None:
    """
    Converts the data types of specific columns in the DataFrame.
    """
    df["CRA"] = pd.to_numeric(df["CRA"], errors="ignore")
    df["COD_FORMA_EVASAO"] = df["COD_FORMA_EVASAO"].astype(int)


def group_graduated_and_dropped(df: pd.DataFrame) -> None:
    """
    Classifies students into groups based on their dropout reason.
    """
    conditions = [
        (df["COD_FORMA_EVASAO"] == 4), #Formado
        (df["COD_FORMA_EVASAO"] == 1), #Sem evasão
        #(Evadidos) Desistência, Jubilado, Desligamento por Abandono, Transferido, Reopção de Curso, Desligamento: Descumpriu Plano de Estudos, Desligamento: Resolução 68/2017-CEPE, Reopção de curso
        (df["COD_FORMA_EVASAO"] == 51) | (df["COD_FORMA_EVASAO"] == 10) | (df["COD_FORMA_EVASAO"] == 9) | (df["COD_FORMA_EVASAO"] == 2) | (df["COD_FORMA_EVASAO"] == 20) | (df["COD_FORMA_EVASAO"] == 7) | (df["COD_FORMA_EVASAO"] == 13) | (df["COD_FORMA_EVASAO"] == 53) | (df["COD_FORMA_EVASAO"] == 100),
        #(outros) Falecimento, Não Informado, Desligamento por mandado judicial, Adaptação Curricular, Nulidade da matrícula - ato administrativo
    ]
    values = ["Formado", "Cursando", "Evadido"] #formado, sem evasão, evadido, outros
    df["GRUP_FORMA_EVASAO"] = np.select(conditions, values, default="Outro")


def group_quotas(df: pd.DataFrame) -> None:
    """
    Classifies students as 'Sim' or 'Nao' based on their quota status.
    """
    df["COTISTA"] = df["COTISTA"].map({"S": "Sim", "N": "Nao"})


def group_entry_mode(df: pd.DataFrame) -> None:
    """
    Classifies students based on their entry mode (Vestibular, Sisu, or Other).
    """
    conditions = [
        (df["FORMA_INGRESSO"] == "Vestibular"),
        (df["FORMA_INGRESSO"] == "Sisu")
    ]
    values = ["Vestibular", "Sisu"]
    df["FORMA_INGRESSO"] = np.select(conditions, values, default="Outro")


def group_course_status(df: pd.DataFrame) -> None:
    """
    Groups course status into 'Aprovado' (Approved) or 'Reprovado' (Failed).
    """
    conditions = [
        #(aprovado) Aprovado, Aproveitamento de Estudos, Aprovado sem Nota, Amparo Legal, Dispensa com nota, Dispensa sem nota
        (df["SITUACAO_ITEM"] == 1) | (df["SITUACAO_ITEM"] == 11) | (df["SITUACAO_ITEM"] == 8) | (df["SITUACAO_ITEM"] == 19) | (df["SITUACAO_ITEM"] == 7)  | (df["SITUACAO_ITEM"] == 4),
        #(reprovado) Reprovado por Freqüência, Reprovado por Nota, Trancamento de Curso , Disciplina sem Oferta, Cancelada
        (df["SITUACAO_ITEM"] == 3) | (df["SITUACAO_ITEM"] == 2) | (df["SITUACAO_ITEM"] == 12) | (df["SITUACAO_ITEM"] == 13) | (df["SITUACAO_ITEM"] == 5)
    ]
    values = ["Aprovado", "Reprovado"] #aprovado, reprovado
    df["GRUP_SITUACAO"] = np.select(conditions, values, default="Outro")


def group_failure_status(df: pd.DataFrame) -> None:
    """
    Classifies students based on whether they have failed at least one course.
    """
    conditions = [
        #(aprovado)
        (df["GRUP_SITUACAO"] == "Aprovado"),
        #(reprovado) em pelo menos 1
        (df["GRUP_SITUACAO"] == "Reprovado")
    ]
    values = ["Não Reprovado", "Reprovado"] #não reprovado, reprovado
    df["REPROVOU"] = np.select(conditions, values, default="Outro")


def group_failures_due_to_attendance(df: pd.DataFrame) -> None:
    """
    Flags students who failed at least one course due to attendance issues.
    """
    conditions = [
        #(aprovado)
        (df["SITUACAO_ITEM"] != 3),
        #(reprovado) em pelo menos 1
        (df["SITUACAO_ITEM"] == 3)
    ]
    values = ["Não", "Sim"] #não reprovado, reprovado
    df["REPROVOU_FREQUENCIA"] = np.select(conditions, values, default="Outro")


def group_departments(df: pd.DataFrame) -> tuple[list, list]:
    """
    Groups departments based on course codes and determines top departments for analysis.
    """
    df["DEPARTAMENTO"] = df["COD_DISCIPLINA"].str[:3]
    if MODO == "treinar_e_prever":
        top_departments = list(df["DEPARTAMENTO"].value_counts().nlargest(QNT_MAX_DEPARTMENTS).index)
    elif MODO == "apenas_prever":
        with open(JSON_NAME, "r") as f:
            config_features = json.load(f)
        departments = config_features["departments"]
        if departments:
            top_departments = [dep[-3:] for dep in departments]
    df["DEPARTAMENTO"] = df["DEPARTAMENTO"].where(df["DEPARTAMENTO"].isin(top_departments), np.nan)
    top_departments_columns = [f"REPROVADA_POR_CURSADA_{department}" for department in top_departments]
    return top_departments, top_departments_columns


### Data Aggregation and Normalization
def check_failure(df: pd.DataFrame) -> None:
    """Checks and flags students who have failed at least one course."""
    if df[df["GRUP_SITUACAO"] == "Reprovado"].empty:
        df["REPROVOU"] = "Nunca"
    else:
        df["REPROVOU"] = "Sim"
    if df[df["REPROVOU_FREQUENCIA"] == "Sim"].empty:
        df["REPROVOU_FREQUENCIA"] = "Nunca"
    else:
        df["REPROVOU_FREQUENCIA"] = "Sim"


def calculate_graduation_time(df: pd.DataFrame) -> pd.DataFrame:
    """Calculates the graduation time for a student based on enrollment and dropout years."""
    df_resetado = df.reset_index(drop=True)
    max_ano_index = df_resetado["ANO"].idxmax()
    min_ano_index = df_resetado["ANO"].idxmin()
    if df_resetado.loc[min_ano_index, "ANO"] == 0:
        df_resetado = df_resetado.drop(df_resetado[df_resetado["ANO"] == 0].index).reset_index(drop=True)
        min_ano_index = df_resetado["ANO"].idxmin()
    sigla_semestre_min_ano = df_resetado.loc[min_ano_index, "SIGLA_SEMESTRE"]
    sigla_semestre_max_ano = df_resetado.loc[max_ano_index, "SIGLA_SEMESTRE"]

    if pd.isna(df_resetado["ANO_EVASAO"].values[0]) or not df_resetado["ANO_EVASAO"].values[0] or df_resetado["ANO_EVASAO"].values[0] == 0:
        max_ano = df_resetado["ANO"].max()
    else:
        max_ano = df_resetado["ANO"].max() if (df_resetado["ANO"].max() >= df_resetado["ANO_EVASAO"].values[0]) else (df_resetado["ANO_EVASAO"].values[0])
    if max_ano and not np.isnan(max_ano) and max_ano != 0:
        df_resetado["ANO_EVASAO"] = max_ano
    min_ano = df_resetado["ANO_INGRESSO"].values[0] #if (df_resetado["ANO"].min() >= df_resetado["ANO_INGRESSO"].values[0]) else (df_resetado["ANO"].min())
    min_ano = df_resetado["ANO"].min() if ((max_ano - min_ano) < 0) else (df_resetado["ANO_INGRESSO"].values[0])
    if min_ano and not np.isnan(min_ano) and min_ano != 0:
        df_resetado["ANO_INGRESSO"] = min_ano

    df_resetado["SEMESTRE_INGRESSO"] = sigla_semestre_min_ano
    if sigla_semestre_min_ano == 1 and sigla_semestre_max_ano == 2:
        df_resetado["TEMPO_GRADUACAO"] = (max_ano - min_ano) + 1
    elif (sigla_semestre_min_ano == 1 and sigla_semestre_max_ano == 1) or (sigla_semestre_min_ano == 2 and sigla_semestre_max_ano == 2):
        df_resetado["TEMPO_GRADUACAO"] = (max_ano - min_ano) + 0.5
    else:
        df_resetado["TEMPO_GRADUACAO"] = (max_ano - min_ano)
    return df_resetado


def calculate_graduation_time_aux(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate auxiliary graduation time based on the year of entry and semester.
    """
    df_list = []
    ano_groups = df.groupby("ANO")
    for ano, ano_data in ano_groups:
        df_resetado = ano_data
        min_ano = df_resetado["ANO_INGRESSO"].values[0]

        semestre_groups = df_resetado.groupby("SIGLA_SEMESTRE")
        for semestre, semestre_data in semestre_groups:
            df_semestre = semestre_data
            sigla_semestre_min = df_semestre["SEMESTRE_INGRESSO"].values[0]
            if sigla_semestre_min == 1 and semestre == 2:
                df_semestre["TEMPO_GRADUACAO_AUX"] = (ano - min_ano) + 1
            elif (sigla_semestre_min == 1 and semestre == 1) or (sigla_semestre_min == 2 and semestre == 2):
                df_semestre["TEMPO_GRADUACAO_AUX"] = (ano - min_ano) + 0.5
            else:
                df_semestre["TEMPO_GRADUACAO_AUX"] = (ano - min_ano)
            df_list.append(df_semestre)
        df_final = pd.concat(df_list)
    return df_final


def calculate_approval_rejection_count(df: pd.DataFrame) -> tuple[int, int]:
    """
    Calculates the count of approved and rejected students.
    """
    qnt_aprovacao = df[df["GRUP_SITUACAO"] == "Aprovado"].count()[0]
    qnt_reprovacao = df[df["GRUP_SITUACAO"] == "Reprovado"].count()[0]
    return qnt_aprovacao, qnt_reprovacao


def calculate_var_metrics(df: pd.DataFrame, var: str, approval_count: int, reproval_count: int) -> None:
    """
    Calculates and adds the approval and reproval metrics for a specific var.
    """
    total_courses = approval_count + reproval_count
    if total_courses == 0:
        df[f"REPROVADA_POR_CURSADA_{var}"] = np.nan
    else:
        df[f"REPROVADA_POR_CURSADA_{var}"] = reproval_count/total_courses


def separate_approvals_rejections_years(df: pd.DataFrame) -> None:
    """
    Separates the dataframe by graduation year and calculates approval and rejection metrics for each year.
    """
    df_aux = calculate_graduation_time_aux(df)
    df["TEMPO_GRADUACAO_AUX"] = df_aux["TEMPO_GRADUACAO_AUX"]
    df_ano1 = df[df["TEMPO_GRADUACAO_AUX"] <= 1.0]
    # df_ano2 = df[df["TEMPO_GRADUACAO_AUX"] > 1.0]
    # df_ano2 = df_ano2[df_ano2["TEMPO_GRADUACAO_AUX"] <= 2.0]
    # df_ano3 = df[df["TEMPO_GRADUACAO_AUX"] > 2.0]
    # df_ano3 = df_ano3[df_ano3["TEMPO_GRADUACAO_AUX"] <= 3.0]
    # df_ano4 = df[df["TEMPO_GRADUACAO_AUX"] > 3.0]
    # df_ano4 = df_ano4[df_ano4["TEMPO_GRADUACAO_AUX"] <= 4.0]
    # df_ano5 = df[df["TEMPO_GRADUACAO_AUX"] > 4.0]
    # df_ano5 = df_ano5[df_ano5["TEMPO_GRADUACAO_AUX"] <= 5.0]
    # df_ano6 = df[df["TEMPO_GRADUACAO_AUX"] > 5.0]

    qnt_aprovacao, qnt_reprovacao = calculate_approval_rejection_count(df_ano1)
    calculate_var_metrics(df, "ANO1", qnt_aprovacao, qnt_reprovacao)
    # qnt_aprovacao, qnt_reprovacao = calculate_approval_rejection_count(df_ano2)
    # calculate_var_metrics(df, "ANO2", qnt_aprovacao, qnt_reprovacao)
    # qnt_aprovacao, qnt_reprovacao = calculate_approval_rejection_count(df_ano3)
    # calculate_var_metrics(df, "ANO3", qnt_aprovacao, qnt_reprovacao)
    # qnt_aprovacao, qnt_reprovacao = calculate_approval_rejection_count(df_ano4)
    # calculate_var_metrics(df, "ANO4", qnt_aprovacao, qnt_reprovacao)
    # qnt_aprovacao, qnt_reprovacao = calculate_approval_rejection_count(df_ano5)
    # calculate_var_metrics(df, "ANO5", qnt_aprovacao, qnt_reprovacao)
    # qnt_aprovacao, qnt_reprovacao = calculate_approval_rejection_count(df_ano6)
    # calculate_var_metrics(df, "ANO>5", qnt_aprovacao, qnt_reprovacao)


def separate_approvals_rejections_departments(df: pd.DataFrame, top_departments: list) -> None:
    """
    Separates the dataframe by department and calculates approval and rejection metrics for each department.
    """
    for department in top_departments:
        df[f"REPROVADA_POR_CURSADA_{department}"] = np.nan
    depart_groups = df.groupby("DEPARTAMENTO")
    for depart, depart_data in depart_groups:
        qnt_aprovacao, qnt_reprovacao = calculate_approval_rejection_count(depart_data)
        calculate_var_metrics(df, str(depart), qnt_aprovacao, qnt_reprovacao)


def unify_students(df_list: list, top_departments_columns: list) -> pd.DataFrame:
    """
    Unifies a list of dataframes by selecting the row with the maximum 'ANO' for each dataframe
    and concatenating them into a single dataframe.
    """
    columns = [
        "CRA",
        "TEMPO_GRADUACAO",
        "REPROVADA_POR_CURSADA_ANO1",
        "REPROVOU",
        "REPROVOU_FREQUENCIA",
        "GRUP_FORMA_EVASAO"
    ]
    columns = MAIN_COLUMNS + columns
    columns.extend(top_departments_columns)
    df_unificado = df_list[0].head(0).copy()
    for df_index in df_list:
        df_resetado = df_index.reset_index(drop=True)
        df_unificado.loc[len(df_unificado)] = list(df_resetado.iloc[df_resetado["ANO"].idxmax()])
    df_unificado = df_unificado[columns]
    return df_unificado


def separate_eviction_types(df: pd.DataFrame) -> tuple:
    """
    Separates the dataset into different categories of "evaded" students based on their eviction reason.
    """
    df_abandono = df[df["COD_FORMA_EVASAO"] == 51] #Abandono
    df_tres_reprovacoes = df[df["COD_FORMA_EVASAO"] == 10] #Três reprovações
    df_transferido = df[df["COD_FORMA_EVASAO"] == 2] #Transferido
    df_desistencia = df[df["COD_FORMA_EVASAO"] == 9] #Desistência
    df_jubilado = df[df["COD_FORMA_EVASAO"] == 7] #Jubilado
    df_reopcao1 = df[df["COD_FORMA_EVASAO"] == 20] #Reopcao 20
    df_reopcao2 = df[df["COD_FORMA_EVASAO"] == 100] #Reopcao 100
    df_reopcao= pd.concat([df_reopcao1, df_reopcao2])
    df_descumpriu_plano_estudos = df[df["COD_FORMA_EVASAO"] == 13] #Descompriu plano de estudos
    df_falecimento = df[df["COD_FORMA_EVASAO"] == 3] #Falecimento
    df_nao_informado = df[df["COD_FORMA_EVASAO"] == 99] #Não informado
    df_resolucao_CEPE = df[df["COD_FORMA_EVASAO"] == 53] #Resolução CEPE
    df_mandado_judicial = df[df["COD_FORMA_EVASAO"] == 300] #Mandado judicial
    df_adaptacao_curricular = df[df["COD_FORMA_EVASAO"] == 31] #Adaptacao curricular
    df_nulidade_matricula = df[df["COD_FORMA_EVASAO"] == 400] #Nulidade da matricula
    return (
        df_abandono,
        df_tres_reprovacoes,
        df_transferido,
        df_desistencia,
        df_jubilado,
        df_reopcao,
        df_descumpriu_plano_estudos,
        df_falecimento,
        df_nao_informado,
        df_resolucao_CEPE,
        df_mandado_judicial,
        df_adaptacao_curricular,
        df_nulidade_matricula
    )


def separate_graduated_and_dropped_out_groups(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Separates the dataset into 'graduated', 'dropped out', and 'no dropout' groups.
    """
    df_graduated = df[df['COD_FORMA_EVASAO'] == 4] #Fomado
    df_enrolled = df[df['COD_FORMA_EVASAO'] == 1] #Sem evasão
    (
        df_abandono,
        df_tres_reprovacoes,
        df_transferido,
        df_desistencia,
        df_jubilado,
        df_reopcao,
        df_descumpriu_plano_estudos,
        df_falecimento,
        df_nao_informado,
        df_resolucao_CEPE,
        df_mandado_judicial,
        df_adaptacao_curricular,
        df_nulidade_matricula
    ) = separate_eviction_types(df)
    df_dropped_out = pd.concat([
        df_abandono,
        df_tres_reprovacoes,
        df_transferido,
        df_desistencia,
        df_jubilado,
        df_reopcao,
        df_descumpriu_plano_estudos,
        df_resolucao_CEPE
    ])
    return df_graduated, df_dropped_out, df_enrolled


def separate_grup_forma_evasao(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Separate a DataFrame into three groups based on student status.
    """
    df_graduated = df[df["GRUP_FORMA_EVASAO_Formado"]]
    df_dropped_out = df[df["GRUP_FORMA_EVASAO_Evadido"]]
    df_enrolled = df[df["GRUP_FORMA_EVASAO_Cursando"]]
    return df_graduated, df_dropped_out, df_enrolled


def convert_category_types(df: pd.DataFrame) -> None:
    """
    Convert specific DataFrame columns to categorical data type.
    """
    df["GRUP_FORMA_EVASAO"] = df["GRUP_FORMA_EVASAO"].astype("category")
    df["REPROVOU"] = df["REPROVOU"].astype("category")
    df["REPROVOU_FREQUENCIA"] = df["REPROVOU_FREQUENCIA"].astype("category")


# Model Training, Tuning, and Predictions
def get_dummies_custom(df: pd.DataFrame) -> pd.DataFrame:
    """
    Converts categorical columns into dummy variables and drops the original columns.
    """
    columns_to_get_dummies = df.select_dtypes(include=["category"]).columns.tolist()
    for col in columns_to_get_dummies:
        df_aux = pd.get_dummies(df[col], prefix=col)
        df = pd.concat([df, df_aux], axis=1).drop(col, axis=1)
    return df


def get_cols_X(df: pd.DataFrame, top_departments_columns: list) -> tuple[list, list]:
    """
    Selects columns for the feature matrix X.
    """
    columns_null_percent = []
    null_percent = df[top_departments_columns].isnull().mean() * 100
    # print(null_percent)
    columns_null_percent = list(null_percent[null_percent < MAX_NULL_PERCENT].index)
    print("\nColunas de departamentos selecionas para o modelo:")
    if len(columns_null_percent) == 0:
        print("Nenhuma coluna de departamento foi selecionada para o modelo.")
    else:
        print(columns_null_percent)
    col_X = COL_X + columns_null_percent
    return col_X, columns_null_percent


def create_pipeline(model) -> Pipeline:
    """
    Creates a machine learning pipeline with a transformer and a model.
    """
    pipeline = Pipeline([
        ("transformer", StandardScaler()),
        ("estimator", model)
    ])
    return pipeline


def run_cross_validation(X: pd.DataFrame, y: pd.Series, param_grid: Dict[str, list], model) -> Dict[str, Any]:
    """
    Runs cross-validation with hyperparameter tuning.
    """
    rkf = RepeatedStratifiedKFold(n_splits=5, n_repeats=6, random_state=11)
    pipeline = create_pipeline(model)

    gs = GridSearchCV(estimator=pipeline, param_grid=param_grid, scoring="accuracy", cv=4)
    cv_results = cross_validate(gs, X, y, cv=rkf, return_estimator=True, scoring="accuracy")
    return cv_results


def compute_metrics(cv_results: Dict[str, Any]) -> list:
    """
    Computes and displays model performance metrics.
    """
    scores = cv_results["test_score"]
    mean, std = scores.mean(), scores.std()
    print("\nMean Accuracy: {:.4f} | Standard Deviation: {:.4f}".format(mean, std))
    return scores


def extract_best_hyperparameters(cv_results: Dict[str, Any]) -> Dict[str, Any]:
    """
    Extracts the most frequent best hyperparameters across cross-validation folds.
    """
    best_params_list = [frozenset(estimator.best_params_.items()) for estimator in cv_results["estimator"]]
    param_counts = Counter(best_params_list)
    best_hyperparams = param_counts.most_common(1)[0][0]
    return dict(best_hyperparams)


def compute_feature_importances(cv_results: Dict[str, Any], X: pd.DataFrame) -> None:
    """
    Computes and plots feature importance.
    """
    feature_importances = [
        estimator.best_estimator_.named_steps["estimator"].feature_importances_
        for estimator in cv_results["estimator"]
    ]

    mean_feature_importances = np.mean(feature_importances, axis=0)
    mean_feature_importances /= np.sum(mean_feature_importances)

    df_feature_importances = pd.DataFrame({
        "Feature": X.columns,
        "Importance": mean_feature_importances
    }).sort_values(by="Importance", ascending=False)

    blue_palette = [
        "#033053",  # Deep Ocean Blue
        "#05427B",  # Navy Blue
        "#05427B",  # Dark Sapphire (Given)
        "#08558F",  # Steel Blue
        "#0E65AD",  # Royal Blue (Given)
        "#2A79C2",  # Azure Blue
        "#4590D4",  # Sky Blue
        "#64A8E2",  # Soft Blue
        "#8BC0EC",  # Pastel Blue
        "#B4D9F4",  # Icy Blue
        "#B4D9F4"   # Frost Blue
    ]
    custom_palette = sns.color_palette(blue_palette)

    plt.figure(figsize=(10, 6))
    sns.barplot(x="Importance", y="Feature", data=df_feature_importances, palette=custom_palette)
    plt.title("Feature Importances (Averaged Over Nested CV Folds)", size=16)
    plt.xlabel("Importance", size=14)
    plt.ylabel("Feature", size=14)
    plt.show()


def clean_hyperparameters(best_hyperparams_dict: Dict[str, Any]) -> Dict[str, Any]:
    """
    Removes the 'estimator__' prefix from hyperparameters.
    """
    return {k.replace("estimator__", ""): v for k, v in best_hyperparams_dict.items()}


def train_model_and_save(X: pd.DataFrame, y: pd.Series, model_name: str, model) -> Pipeline:
    """
    Trains the final model using the provided data and saves the trained model as a pickle file in the fixed directory.
    """
    final_model = create_pipeline(model)
    final_model.fit(X, y)
    with open(model_name, "wb") as f:
        pickle.dump(final_model, f)
    return final_model

def get_folder_id(folder_url: str) -> str:
    """
    Extract the folder ID from a Google Drive folder URL.
    """
    folder_id = folder_url.split("/folders/")[-1]
    folder_id = folder_id.split("?")[0]
    return folder_id


def upload_drive(name: str, folder_id: str) -> None:
    """
    Upload a file to Google Drive.
    """
    mtype, _ = guess_type(name)
    metadata = {
        "name": name,
        "parents": [folder_id]
    }
    media = MediaFileUpload(f"{name}", mimetype=mtype)
    response = (
        service_drive.files()
        .create(
            body=metadata,
            media_body=media,
            fields="id",
            supportsAllDrives=True,
        )
        .execute()
    )


def save_config_features(folder_id: str, model_departments_columns: list) -> None:
    """
    Save configuration features to a JSON file and upload it to Google Drive.
    """
    config_features = {
        "departments": model_departments_columns
    }
    with open(JSON_NAME, "w") as f:
        json.dump(config_features, f, indent=4)
    upload_drive(JSON_NAME, folder_id)


def load_file_info_drive(folder_id: str) -> tuple[dict, dict]:
    """
    Load file information for model and JSON files from Google Drive.
    """
    file_info_json = {}
    files_model = []
    files_json = []
    files = (
        service_drive.files()
        .list(
            fields="files(id,name,modifiedTime)",
            q=f"'{folder_id}' in parents",
            supportsAllDrives=True,
            includeItemsFromAllDrives=True,
        )
        .execute()["files"]
    )
    for file_dict in files:
        if file_dict["name"] == MODEL_NAME:
            files_model.append(
                {
                    "name": file_dict["name"],
                    "id": file_dict["id"],
                    "modified_time": file_dict["modifiedTime"]
                }
            )
        elif file_dict["name"] == JSON_NAME:
            files_json.append(
                {
                    "name": file_dict["name"],
                    "id": file_dict["id"],
                    "modified_time": file_dict["modifiedTime"]
                }
            )
    if len(files_model) == 0:
        raise FileNotFoundError(f"Arquivo '{MODEL_NAME}' não encontrado no Google Drive.")
    if len(files_json) == 0:
        raise FileNotFoundError(f"Arquivo '{JSON_NAME}' não encontrado no Google Drive.")
    file_info_model = max(files_model, key=lambda x: x["modified_time"])
    file_info_json = max(files_json, key=lambda x: x["modified_time"])
    return file_info_model, file_info_json


def download_file_drive(file_info: dict) -> None:
    """
    Downloads a file from Google Drive using the Drive API.
    """
    request = service_drive.files().get_media(
        fileId=file_info["id"], supportsAllDrives=True
    )
    file = io.FileIO(file_info["name"], "wb")
    downloader = MediaIoBaseDownload(file, request)
    done = False
    while done is False:
        status, done = downloader.next_chunk()


def load_model(model_name: str) -> Pipeline:
    """
    Loads a trained model from a pickle file.
    """
    with open(model_name, "rb") as f:
        model = pickle.load(f)
    return model


def make_predictions(model: Pipeline, X_new: pd.DataFrame) -> pd.DataFrame:
    """
    Makes predictions on new data and returns a DataFrame with the predictions.
    """
    y_pred_new = model.predict(X_new)
    df_y_pred = pd.DataFrame(y_pred_new, columns=["y_pred"])
    return df_y_pred


def save_predictions(df_data: pd.DataFrame, df_y_pred: pd.DataFrame, filename: str) -> None:
    """
    Merges predictions with original data, selects predefined columns, and saves the results to a excel file.

    """
    df_merged = df_data.join(df_y_pred, how="inner")
    df_pred = df_merged[df_merged["y_pred"] == 1]
    df_evadidos = df_pred.sort_values(by=["ANO_INGRESSO"], ascending=False).reset_index(drop=True)
    df_evadidos = df_evadidos.drop("y_pred", axis=1)
    df_evadidos.to_excel(filename, index=False)
    files.download(filename)



# MAIN

model = None
folder_id = get_folder_id(URL_PASTA_DRIVE.strip())

# Data Loading
df_upload = upload_files()

# Data Cleaning, Transformation, and Aggregation
df = df_upload.drop_duplicates()
df = df.dropna(subset=["COD_FORMA_EVASAO"])
convert_types(df)
group_graduated_and_dropped(df)
group_quotas(df)
group_entry_mode(df)
group_course_status(df)
group_failure_status(df)
group_failures_due_to_attendance(df)

if MODO == "apenas_prever":
    file_info_model, file_info_json = load_file_info_drive(folder_id)
    download_file_drive(file_info_model)
    download_file_drive(file_info_json)
top_departments, top_departments_columns = group_departments(df)

### Data Aggregation and Normalization
df_list = []
qnt = 0
matr_groups = df.groupby("MATR_ALUNO")
for matr, matr_data in matr_groups:
    df_aux = matr_data.copy().reset_index(drop=True)
    check_failure(df_aux)
    df_aux = calculate_graduation_time(df_aux)
    separate_approvals_rejections_years(df_aux)
    separate_approvals_rejections_departments(df_aux, top_departments)
    df_list.append(df_aux)
df = unify_students(df_list, top_departments_columns)

# Model Training, Tuning, and Predictions
convert_category_types(df)
df = get_dummies_custom(df)
col_X, model_departments_columns = get_cols_X(df, top_departments_columns)
df = df.dropna(subset=col_X)
df_graduated, df_dropped_out, df_enrolled = separate_grup_forma_evasao(df)
df_graduated_dropped_out = pd.concat([df_graduated, df_dropped_out])

#### XGBoost
df_graduated_dropped_out = df_graduated_dropped_out.reset_index(drop=True)
columns = MAIN_COLUMNS + col_X
df_enrolled = df_enrolled[columns]
X_new = df_enrolled.reset_index(drop=True)

if MODO == "treinar_e_prever":
    X = df_graduated_dropped_out[col_X]
    y = df_graduated_dropped_out[col_y]
    model = XGBClassifier(random_state=42)
    param_grid = {
        "estimator__learning_rate": [0.1, 0.2, 0.3],
        "estimator__max_depth": [3, 6, 9],
        "estimator__min_child_weight": [1, 3, 5]
    }
    cv_results = run_cross_validation(X, y, param_grid, model)
    scores_eng_xgb = compute_metrics(cv_results)
    best_hyperparams_dict = extract_best_hyperparameters(cv_results)
    print("\nBest Hyperparameters (Most Frequent in Nested CV):")
    print(best_hyperparams_dict)
    compute_feature_importances(cv_results, X)

    best_hyperparams_clean = clean_hyperparameters(best_hyperparams_dict)
    model = XGBClassifier(**best_hyperparams_clean, random_state=42)
    final_model = train_model_and_save(X, y, MODEL_NAME, model)
    if URL_PASTA_DRIVE.strip():
        upload_drive(MODEL_NAME, folder_id)
        save_config_features(folder_id, model_departments_columns)
    predictions = make_predictions(final_model, X_new[col_X])
    save_predictions(X_new, predictions, OUTPUT_FILENAME)

elif MODO == "apenas_prever":
    loaded_model = load_model(MODEL_NAME)
    predictions = make_predictions(loaded_model, X_new[col_X])
    save_predictions(X_new, predictions, OUTPUT_FILENAME)
