In [None]:
import pandas as pd
import unicodedata
import numpy as np
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import pandas as pd
import numpy as np

warnings.filterwarnings('ignore')
# Set plotting style
plt.style.use('default')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 8)

# 1. Carregando os Dados


In [49]:
# Insira o path para o arquivo de testes
test_file_path = "dados/train.parquet"

# Insira o nome da coluna que tem o input de usuário
user_input_col = "user_input"

# Insira o nome da coluna que tem o UF
uf_col = "uf"

# Insira o nome da coluna que tem a razaosocial
razaosocial_col = "razaosocial"

# Insira o nome da coluna que tem o nome_fantasia
nome_fantasia_col = "nome_fantasia"

# Insira o nome da coluna que tem o cnpj
cnpj_col = "cnpj"

cols_to_keep = [user_input_col, uf_col, razaosocial_col, nome_fantasia_col, cnpj_col]       
df = pd.read_parquet("dados/train.parquet")[cols_to_keep]

# 2. Preparando os Dados - Rode todas as células dessa seção



## 2.1 Remocao de Termos Irrelevantes

Colunas com o sufixo `_cleaned` serao criadas para as colunas `user_input`, `razaosocial` e `nome_fantasia`. Nelas, termos como "S.A.", "LTDA", "LTDA.", "S/A", "S.A", "Ltda", "Ltda.", "S/A.", "S.A.", "S.A", "Ltda" e "Ltda" serao removidos.


Essa remocao é importante, pois ao se calcular a similaridade entre um `user_input` e/ou `razaosocial` e `nome_fantasia`, as métricas de similaridade seriam prejudicadas na ausencia de tais termos no `user_input`. 

Também serao removidas acentos e stopwords da língua portuguesa, sobretudo preposicoes. Tais termos, por nao informarem sobre empresas específicas, podem prejudicar a acurácia das buscas/retrieval.
Além disso, passaremos tudo para letras minúsculas, para evitar problemas de case-sensitive.

In [45]:
def comprehensive_text_cleaning(text, 
                               remove_accents=True,
                               remove_stop_words=True, 
                               remove_company_suffixes=True,
                               custom_stop_words=None,
                               to_lowercase=True):
    """
    Comprehensive text cleaning function
    
    Parameters:
    text (str): Input text
    remove_accents (bool): Remove accents and normalize characters
    remove_stop_words (bool): Remove Portuguese stop words
    remove_company_suffixes (bool): Remove common company suffixes
    custom_stop_words (set): Additional stop words to remove
    to_lowercase (bool): Convert to lowercase
    
    Returns:
    str: Cleaned text
    """
    
    if pd.isna(text):
        return text
    
    text = str(text)
    
    if remove_accents:
        text = unicodedata.normalize('NFD', text)
        text = ''.join(char for char in text if unicodedata.category(char) != 'Mn')
        text = text.replace('ç', 'c').replace('Ç', 'C')
    
    if to_lowercase:
        text = text.lower()
    
    if remove_company_suffixes:
        patterns_to_remove = [
        r'\bS\.?A\.?\b',           # S.A, SA, S.A., SA.
        r'\bS/A\.?\b',             # S/A, S/A.
        r'\bLTDA\.?\b',            # LTDA, LTDA.
        r'\bLIMITADA\b',           # LIMITADA
        r'\bCIA\.?\b',             # CIA, CIA.
        r'\bCOMPANHIA\b',          # COMPANHIA
        r'\bEMPRESA\b',            # EMPRESA
        r'\bCOMERCIO\b',           # COMERCIO
        r'\bSERVICOS?\b',          # SERVICO, SERVICOS
        r'\bME\b',                 # ME (Microempresa)
        r'\bEPP\b',                # EPP (Empresa de Pequeno Porte)
        r'\bEIRELI\b',             # EIRELI
        r'\bSOCIEDADE\b',          # SOCIEDADE
        r'ADMINISTRADORA\b',       # ADMINISTRADORA
        r'GERAL\b',                # GERAL
    ]
        
        for pattern in patterns_to_remove:
            text = re.sub(pattern, '', text, flags=re.IGNORECASE)
    
    if remove_stop_words:
        portuguese_stop_words = {
            'a', 'ao', 'aos', 'as', 'da', 'das', 'de', 'do', 'dos', 'e', 'em', 'na', 
            'nas', 'no', 'nos', 'o', 'os', 'para', 'por', 'com', 'um', 'uma', 'uns', 
            'umas', 'se', 'que', 'ou', 'mas', 'como', 'mais', 'muito', 'sua', 'seu',
            'seus', 'suas', 'este', 'esta', 'estes', 'estas', 'esse', 'essa', 'esses',
            'essas', 'aquele', 'aquela', 'aqueles', 'aquelas', 'isto', 'isso', 'aquilo'
        }
        
        if custom_stop_words:
            portuguese_stop_words.update(custom_stop_words)
        
        words = text.split()
        words = [word for word in words if word.lower() not in portuguese_stop_words]
        text = ' '.join(words)
    
    text = re.sub(r'[^\w\s]', ' ', text)  # Remove punctuation
    text = re.sub(r'\s+', ' ', text)      # Multiple spaces to single space
    text = text.strip()                   # Remove leading/trailing spaces
    
    return text

In [54]:
df['user_input_cleaned'] = df[user_input_col].apply(comprehensive_text_cleaning)
df['razaosocial_cleaned'] = df[razaosocial_col].apply(comprehensive_text_cleaning)
df['nome_fantasia_cleaned'] = df[nome_fantasia_col].apply(comprehensive_text_cleaning)

# 3. Retriever com TF-IDF e Similaridade do Cosseno

## 3.1 Implementação do Retriever

O retriever será implementado na classe `TextRetriever`.

O método `TextRetrieval.find_best_matches` recebe um `user_input` e um dataframe contendo apenas as entradas correspondentes ao estado/`uf` daquele input e retorna os `k` `razaosocial` e `nome_fantasia` mais similares utilizando a técnica de TF-IDF.


In [None]:
from typing import List, Tuple
from enum import Enum
import pandas as pd
from collections import Counter

class SimilarityMetric(Enum):
    TFIDF = "tfidf"


class TextRetrieval:
    """
    A class for text retrieval using similarity metrics.
    Here we only have the TFIDF metric, because it is the
    best one found for the solution
    """

    _tfidf_cache = {}
    
    _df_by_uf = {
        uf: group.drop_duplicates(subset=['razaosocial_cleaned', 'nome_fantasia_cleaned','cnpj',])
        for uf, group in df.groupby('uf')
    }
    
    # The values of this dictionary are tuples in which the index 1 holds
    # a boolean that tells whether a metric is such that the higher 
    # its value the more similar the strings being compared are
    _METRIC_CONFIG = {
        SimilarityMetric.TFIDF: (None, True),  
    }


    @staticmethod
    def _get_tfidf_cache_key(df: pd.DataFrame) -> str:
        return str(hash(pd.util.hash_pandas_object(df[["razaosocial_cleaned", "nome_fantasia_cleaned"]], index=False).sum()))

    @classmethod
    def _get_tfidf_cache(cls, df: pd.DataFrame):
        key = cls._get_tfidf_cache_key(df)
        if key not in cls._tfidf_cache:
            combined = (
                df["razaosocial_cleaned"].fillna('') + ' ' + df["nome_fantasia_cleaned"].fillna('')
            )
            vectorizer = TfidfVectorizer(analyzer='char_wb', ngram_range=(2, 4))
            tfidf_matrix = vectorizer.fit_transform(combined)
            cls._tfidf_cache[key] = (vectorizer, tfidf_matrix, combined)
        return cls._tfidf_cache[key]

    @staticmethod
    def _retrieve_topk_cnpjs_from_pairs(
        df: pd.DataFrame,
        razao_social_list: List[str],
        nome_fantasia_list: List[str],
        cnpj_col: str = "cnpj",
        not_cleaned_razao_col: str = razaosocial_col,
        not_cleaned_fantasia_col: str = nome_fantasia_col,
        top_k: int = 5
    ) -> List[str]:
        assert len(razao_social_list) == len(nome_fantasia_list)
        mask = pd.Series(False, index=df.index)
        for razao, fantasia in zip(razao_social_list, nome_fantasia_list):
            mask |= ((df[not_cleaned_razao_col] == razao) & (df[not_cleaned_fantasia_col] == fantasia))
        cnpjs = df[mask][cnpj_col].dropna().tolist()
        most_common = Counter(cnpjs).most_common(top_k)
        return [cnpj for cnpj, _ in most_common]

   

    @classmethod
    def _find_matches_tfidf(
        cls,
        user_input: str,
        uf_df: pd.DataFrame,
        top_k: int,
        not_cleaned_razao_col: str = razaosocial_col,
        not_cleaned_fantasia_col: str = nome_fantasia_col,
        cnpj_col: str = "cnpj"
    ) -> Tuple[List[str], List[str], List[str]]:

        # If the tfidf matrix has already been done for the companies
        # of a specific statem then we get it from the cache
        vectorizer, tfidf_matrix, _ = cls._get_tfidf_cache(uf_df)
        query_vec = vectorizer.transform([user_input])
        similarities = cosine_similarity(query_vec, tfidf_matrix).flatten()
        top_indices = np.argsort(similarities)[::-1][:top_k]

        best_razao_matches = df.iloc[top_indices][not_cleaned_razao_col].tolist()
        best_nome_fantasia_matches = df.iloc[top_indices][not_cleaned_fantasia_col].tolist()
        best_cnpj_matches =  df.iloc[top_indices][cnpj_col].tolist()

        return (
            best_razao_matches,
            best_nome_fantasia_matches,
            best_cnpj_matches,
        )
    
    @classmethod
    def find_best_matches(
        cls,
        user_input: str,
        uf: str, 
        metric: SimilarityMetric = SimilarityMetric.TFIDF, 
        top_k: int = 1
    ) -> Tuple[List[str], List[str], List[float], List[float], List[str]]:


        df_uf = TextRetrieval._df_by_uf.get(uf)

        if metric == SimilarityMetric.TFIDF:
            return cls._find_matches_tfidf(user_input, df_uf, top_k)

        if metric not in cls._METRIC_CONFIG:
            raise ValueError(f"Unsupported metric: {metric}")
        

## 3.2 Recuperando a `razaosocial` e `nome_fantasia` dado um `user_input` e um `uf`

> O método `TextRetrieval.find_best_matches` recebe um `user_input`, um `uf` e retorna uma tupla contendo uma lista com os top k `razaosocial`, uma com os `nome_fantasia` e outra com os top k `cnpj`, respectivamente. Ela também recebe uma métrica a ser utilizada, que por padrão foi decidida ser a `SimilarityMetric.TFIDF`.

A funcao `evaluate_matching` recebe o dataframe de testes e gera um dataframe que pode ser utilizado para gerar as métricas de performance. Por exemplo, pode-se comparar as colunas "top_1_razaosocial_pred" e "razaosocial_not_cleaned" para verificar a acurácia da `razaosocial` retornada pelo retriever.

In [59]:
from tqdm import tqdm
import pandas as pd

def evaluate_matching(df_sample: pd.DataFrame,
                      df_cleaned: pd.DataFrame,
                      metric: SimilarityMetric,
                      top_k: int =5):
    results = []

    df_by_uf = {
        uf: group.drop_duplicates(subset=['razaosocial_cleaned', 'nome_fantasia_cleaned','cnpj',])
        for uf, group in df_cleaned.groupby('uf')
    }

    indexes = []
    results_dict = {}
    for idx, row in tqdm(df_sample.iterrows(), total=len(df_sample), desc="Processing"):
        user_input = row['user_input_cleaned']
        razaosocial = row['razaosocial_cleaned']
        nome_fantasia = row['nome_fantasia_cleaned']
        not_cleaned_user_input = row['user_input']
        true_razaosocial = row['razaosocial']
        true_nome_fantasia = row['nome_fantasia']
        cnpj = row['cnpj']
        uf = row['uf']

        df_uf = df_by_uf.get(uf)
        if df_uf is None or df_uf.empty:
            continue  # Skip if no data for that UF

        # One fast match call
        top_k_razao, top_k_nome, top_k_cnpj = TextRetrieval.find_best_matches(
            user_input, uf, metric, top_k=top_k
        )

        top_1_razao = top_k_razao[0] if top_k_razao else None
        top_1_nome = top_k_nome[0] if top_k_nome else None
        top_1_cnpj = top_k_cnpj[0] if top_k_cnpj else None

        result_row = {
            "user_input": user_input,
            "user_input_not_cleaned": not_cleaned_user_input,
            "razaosocial_not_cleaned": true_razaosocial,
            "nome_fantasia_not_cleaned": true_nome_fantasia,
            "razaosocial": razaosocial,
            "nome_fantasia": nome_fantasia,
            "cnpj": cnpj,
            "uf": uf,
            
            "top_1_razaosocial_retrieved": top_1_razao,
            "top_1_nomefantasia_retrieved": top_1_nome,
            "top_1_cnpj_retrieved": top_1_cnpj,

            "top_1_razaosocial_pred": top_1_razao == true_razaosocial if top_1_razao else False,
            "top_1_nomefantasia_pred": top_1_nome == true_nome_fantasia if top_1_nome else False,
            "top_1_cnpj_pred": top_1_cnpj == cnpj if top_1_cnpj else False,

            "top_5_razaosocial_pred": true_razaosocial in top_k_razao,
            "top_5_nomefantasia_pred": true_nome_fantasia in top_k_nome,
            "top_5_cnpj_pred": cnpj in top_k_cnpj,


        }

        results_dict[idx] = result_row
        results_dict[idx]['top_k_razaosocial_pred'] = top_k_razao
        results_dict[idx]['top_k_nomefantasia_pred'] = top_k_nome
        results_dict[idx]['top_k_cnpj_pred'] = top_k_cnpj
        

        results.append(result_row)
        indexes.append(idx)

    results_df = pd.DataFrame(results, index=indexes)
    return results_df, results_dict


# 4. Resultados 

In [None]:
metric = SimilarityMetric.TFIDF
results_df, results_dict = evaluate_matching(df, df, metric, top_k=5)
print("="* 100)
print(f"\nResultados da Avaliação para a métrica: {metric.name}"
      f"\nTamanho do DataFrame de Resultados: {results_df.shape}"
      f"\nNúmero de Linhas com Predições Corretas (Top 1 Razão Social): {results_df['top_1_razaosocial_pred'].sum()}"
      f"\nNúmero de Linhas com Predições Corretas (Top 1 Nome Fantasia): {results_df['top_1_nomefantasia_pred'].sum()}"
      f"\nNúmero de Linhas com Predições Corretas (Top 1 CNPJ): {results_df['top_1_cnpj_pred'].sum()}"
      f"\nNúmero de Linhas com Predições Corretas (Top 5 Razão Social): {results_df['top_5_razaosocial_pred'].sum()}"
      f"\nNúmero de Linhas com Predições Corretas (Top 5 Nome Fantasia): {results_df['top_5_nomefantasia_pred'].sum()}"
      f"\nNúmero de Linhas com Predições Corretas (Top 5 CNPJ): {results_df['top_5_cnpj_pred'].sum()}"
      f"\nAcuracias:\n"
      f"Top 1 Razão Social: {results_df['top_1_razaosocial_pred'].mean() * 100:.2f}%"
      f"\nTop 1 Nome Fantasia: {results_df['top_1_nomefantasia_pred'].mean() * 100:.2f}%"
      f"\nTop 1 CNPJ: {results_df['top_1_cnpj_pred'].mean() * 100:.2f}%"
      f"\nTop 5 Razão Social: {results_df['top_5_razaosocial_pred'].mean() * 100:.2f}%"
      f"\nTop 5 Nome Fantasia: {results_df['top_5_nomefantasia_pred'].mean() * 100:.2f}%"
      f"\nTop 5 CNPJ: {results_df['top_5_cnpj_pred'].mean() * 100:.2f}%")
print("="* 100)


Processing:  84%|████████▎ | 213391/255471 [10:51<02:08, 328.47it/s]

# 5. Resultados para os Dados Fornecidos no Case

## Visão Geral
- **Métrica Utilizada**: TF-IDF
- **Tamanho do Dataset**: 255.471 registros
- **Campos Avaliados**: Razão Social, Nome Fantasia e CNPJ

---

## 🎯 Resultados de Acurácia

### Top 1 (Melhor Predição)
| Campo | Acertos | Acurácia |
|-------|---------|----------|
| **Razão Social** | 222.557 | **87.12%** |
| **Nome Fantasia** | 179.384 | **70.22%** |
| **CNPJ** | 88.462 | **34.63%** |

### Top 5 (Entre as 5 Melhores Predições)
| Campo | Acertos | Acurácia |
|-------|---------|----------|
| **Razão Social** | 241.348 | **94.47%** |
| **Nome Fantasia** | 216.024 | **84.56%** |
| **CNPJ** | 140.312 | **54.92%** |

---

In [25]:
metric = SimilarityMetric.TFIDF
results_df, results_dict = evaluate_matching(df, df, metric, top_k=5)
print("="* 100)
print(f"\nResultados da Avaliação para a métrica: {metric.name}"
      f"\nTamanho do DataFrame de Resultados: {results_df.shape}"
      f"\nNúmero de Linhas com Predições Corretas (Top 1 Razão Social): {results_df['top_1_razaosocial_pred'].sum()}"
      f"\nNúmero de Linhas com Predições Corretas (Top 1 Nome Fantasia): {results_df['top_1_nomefantasia_pred'].sum()}"
      f"\nNúmero de Linhas com Predições Corretas (Top 1 CNPJ): {results_df['top_1_cnpj_pred'].sum()}"
      f"\nNúmero de Linhas com Predições Corretas (Top 5 Razão Social): {results_df['top_5_razaosocial_pred'].sum()}"
      f"\nNúmero de Linhas com Predições Corretas (Top 5 Nome Fantasia): {results_df['top_5_nomefantasia_pred'].sum()}"
      f"\nNúmero de Linhas com Predições Corretas (Top 5 CNPJ): {results_df['top_5_cnpj_pred'].sum()}"
      f"\nAcuracias:\n"
      f"Top 1 Razão Social: {results_df['top_1_razaosocial_pred'].mean() * 100:.2f}%"
      f"\nTop 1 Nome Fantasia: {results_df['top_1_nomefantasia_pred'].mean() * 100:.2f}%"
      f"\nTop 1 CNPJ: {results_df['top_1_cnpj_pred'].mean() * 100:.2f}%"
      f"\nTop 5 Razão Social: {results_df['top_5_razaosocial_pred'].mean() * 100:.2f}%"
      f"\nTop 5 Nome Fantasia: {results_df['top_5_nomefantasia_pred'].mean() * 100:.2f}%"
      f"\nTop 5 CNPJ: {results_df['top_5_cnpj_pred'].mean() * 100:.2f}%")
print("="* 100)


Processing: 100%|██████████| 255471/255471 [13:06<00:00, 324.82it/s]



Resultados da Avaliação para a métrica: TFIDF
Tamanho do DataFrame de Resultados: (255471, 20)
Número de Linhas com Predições Corretas (Top 1 Razão Social): 222557
Número de Linhas com Predições Corretas (Top 1 Nome Fantasia): 179384
Número de Linhas com Predições Corretas (Top 1 CNPJ): 88462
Número de Linhas com Predições Corretas (Top 5 Razão Social): 241348
Número de Linhas com Predições Corretas (Top 5 Nome Fantasia): 216024
Número de Linhas com Predições Corretas (Top 5 CNPJ): 140312
Acuracias:
Top 1 Razão Social: 87.12%
Top 1 Nome Fantasia: 70.22%
Top 1 CNPJ: 34.63%
Top 5 Razão Social: 94.47%
Top 5 Nome Fantasia: 84.56%
Top 5 CNPJ: 54.92%
