<a href="https://colab.research.google.com/github/GustavoNicodemos/Algoritmo_analise_resultado/blob/main/Algoritmo_Anlise_GUV.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [27]:
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.cluster import MiniBatchKMeans
from sklearn.preprocessing import OneHotEncoder
import xgboost as xgb

In [33]:
# ==============================================================
# Classe principal para análise contábil automatizada
# ==============================================================
class AccountingAnalyzer:
  """
    Classe para análise contábil automatizada:
    - Fase de treino separada da fase de análise.
    - Detecta anomalias, agrupa clusters e gera explicações.
    """
  def __init__(self):
        # Modelos utilizados na análise
        self.anomaly_detector = IsolationForest(n_estimators=100, contamination=0.05, random_state=42)
        self.clusterer = MiniBatchKMeans(n_clusters=5, random_state=42)
        self.encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
        self.classifier = xgb.XGBClassifier()
        self.history = pd.DataFrame()  # Armazena dados já analisados
        self.treinou = False  # Indica se o modelo foi treinado

  def fit(self, historical_data):
        """
        Treina o modelo de detecção de anomalias com base em dados históricos.
        """
        features = self._preprocess(dados_historicos)
        self.anomaly_detector.fit(features)
        self.treinou = True
        print("✅ Modelo treinado com sucesso com base histórica.")

  def analyze(self, new_data):
        """Analisa os novos dados com o modelo previamente treinado."""
        if not self.treinou:
            raise ValueError("❌ O modelo precisa ser treinado antes da análise. Use o método .fit() com dados históricos.")
        # Remove duplicados com base no histórico já analisado
        new_data = self._check_history(new_data)
        # Pré-processamento (mesmas transformações do treino)
        features = self._preprocess(new_data)

        # Detectar anomalias
        new_data['anomaly'] = self.anomaly_detector.fit_predict(features)

        # Agrupar em clusters
        new_data['cluster'] = self.clusterer.fit_predict(features)

        # Classificar tipo de problema se o modelo já foi treinado
        if not self.history.empty and hasattr(self.classifier, 'classes_'):
            model_features = self._create_features(new_data)
            new_data['problem_type'] = self.classifier.predict(model_features)

        # Atualiza o histórico
        self._update_history(new_data)

        # Retorna relatório consolidado
        return self._generate_report(new_data)

  def _check_history(self, data):
        """Remove dados já analisados anteriormente."""
        if self.history.empty:
            return data
        return data[~data.index.isin(self.history.index)]

  def _preprocess(self, data):
        """Codifica dados categóricos e prepara os dados numéricos."""
        cat_data = self.encoder.fit_transform(data[['Cost_Center', 'Group_Account']])
        num_data = data[['Amount_in_LC']].values
        return np.hstack([num_data, cat_data])

  def _create_features(self, data):
        """Gera features adicionais para o classificador."""
        cat_data = self.encoder.transform(data[['Cost_Center', 'Group_Account']])
        num_data = data[['Amount_in_LC', 'anomaly', 'cluster']].values
        return np.hstack([num_data, cat_data])

  def _update_history(self, data):
        """Salva os dados já analisados para evitar repetições futuras."""
        self.history = pd.concat([self.history, data], ignore_index=True)

  def _generate_explanations(self, data):
        """Gera justificativas automáticas para anomalias."""
        explanations = []
        for _, row in data.iterrows():
            if row['anomaly'] == -1:
                explanations.append(f"Valor atípico detectado em {row['Cost_Center']} - {row['Group_Account']}")
            else:
                explanations.append("Sem anomalias.")
        return explanations

  def _analyze_clusters(self, data):
        """Faz a análise dos agrupamentos por cluster."""
        return data.groupby(['cluster', 'Group_Account']).agg({'Amount_in_LC': ['mean', 'count']}).reset_index()

  def _generate_report(self, data):
        """Compila o relatório final da análise."""
        return {
            'anomalias_df': data[data['anomaly'] == -1],
            'clusters_df': self._analyze_clusters(data),
            'explicacoes_df': pd.DataFrame({'explicacoes': self._generate_explanations(data)})
        }



In [39]:
# ==============================================================
# Execução do algoritmo
# ==============================================================
if __name__ == "__main__":
    base_dados = pd.read_excel('Dados_GUV.xlsx')
    dados_historicos = pd.read_excel('Dados_historicos_GUV.xlsx')
    analisador = AccountingAnalyzer()
    analisador.fit(dados_historicos)
    relatorio = analisador.analyze(base_dados)

    # Armazena os dados em variáveis para visualização
    anomalias_df = relatorio['anomalias_df']
    clusters_df = relatorio['clusters_df']
    explicacoes_df = relatorio['explicacoes_df']



✅ Modelo treinado com sucesso com base histórica.


In [40]:
 # Gera Output Excel
anomalias_df.to_excel('anomalias_df.xlsx', index=False)
clusters_df.columns = ['_'.join(col).strip() if isinstance(col, tuple) else col for col in clusters_df.columns]
clusters_df.to_excel('clusters_df.xlsx', index=False)
explicacoes_df.to_excel('explicacoes_df.xlsx', index=False)