Importar todas as libs necessarias


In [None]:
!pip install pyarrow
!pip install pyyaml
!pip install -U sentence-transformers

In [1]:
import pandas as pd
from multiprocessing import Pool
import os
import yaml
from tqdm import tqdm  # For progress bar
import torch
from concurrent.futures import ThreadPoolExecutor

Transformando o database csv para parquet


Se ja tiver o arquivo em parquet, pular esta etapa.

In [2]:
# Define the path to the CSV file
csv_path = 'tce_fit.csv'
output_dir = 'output_parquets'

# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)

In [3]:
def process_chunk(chunk_info):
    chunk, index = chunk_info # chunk info , index ao qual essa chunk pertence
    parquet_path = os.path.join(output_dir, f'tce_part_{index}.parquet')
    chunk.to_parquet(parquet_path, engine='pyarrow', index=False)
    return f"Processed part {index} saved to {parquet_path}"

In [4]:
# Total rows excluding header
total_rows = sum(1 for _ in open(csv_path)) - 1

# Read the CSV file in chunks
chunk_size = total_rows // 10 + 1  # Determine chunk size for 10 parts

# Specify dtype to avoid conversion errors
chunks = []
for index, chunk in enumerate(pd.read_csv(
    csv_path,
    chunksize=chunk_size,
    sep=';',
    on_bad_lines='skip',
    dtype={'CPFCNPJCredor': 'object'},  # Ensure this column is read as string
    low_memory=False,  # Prevent pandas from reading in smaller parts and inferring types
    )):
    chunks.append((chunk, index))

try: 
    # Use multiprocessing to process each chunk in parallel
    with Pool(processes=10) as pool:
        results = pool.map(process_chunk, chunks) # The Pool.map() function ensures the chunks are processed in the 
                                                # same order as they are provided in the input (chunks list).                            
    for result in results:
        print(result)                                            
except Exception as e:
    print(f"An error occurred: {e}") 


Processed part 0 saved to output_parquets/tce_part_0.parquet
Processed part 1 saved to output_parquets/tce_part_1.parquet
Processed part 2 saved to output_parquets/tce_part_2.parquet
Processed part 3 saved to output_parquets/tce_part_3.parquet
Processed part 4 saved to output_parquets/tce_part_4.parquet
Processed part 5 saved to output_parquets/tce_part_5.parquet
Processed part 6 saved to output_parquets/tce_part_6.parquet
Processed part 7 saved to output_parquets/tce_part_7.parquet
Processed part 8 saved to output_parquets/tce_part_8.parquet
Processed part 9 saved to output_parquets/tce_part_9.parquet


In [5]:
# Reassembling the parquet files into a single one
output_file = 'reassembled_tce.parquet'

# List all Parquet files in the directory and sort them in ascending order
parquet_files = sorted([f for f in os.listdir(output_dir) if f.startswith('tce_part_') and f.endswith('.parquet')])
#print("parquet files ", parquet_files)

# Initialize an empty list to store DataFrames
dataframes = []

# Read each Parquet file in order and append to the list
for file in parquet_files:
    file_path = os.path.join(output_dir, file)
    print(f"Reading {file_path}")
    df = pd.read_parquet(file_path)
    dataframes.append(df)

# Concatenate all DataFrames into a single DataFrame
final_df = pd.concat(dataframes, ignore_index=True)

# Save the combined DataFrame to a single Parquet file
final_df.to_parquet(output_file, engine='pyarrow', index=False)
print(f"Reassembled Parquet file saved to {output_file}")

Reading output_parquets/tce_part_0.parquet
Reading output_parquets/tce_part_1.parquet
Reading output_parquets/tce_part_2.parquet
Reading output_parquets/tce_part_3.parquet
Reading output_parquets/tce_part_4.parquet
Reading output_parquets/tce_part_5.parquet
Reading output_parquets/tce_part_6.parquet
Reading output_parquets/tce_part_7.parquet
Reading output_parquets/tce_part_8.parquet
Reading output_parquets/tce_part_9.parquet
Reassembled Parquet file saved to reassembled_tce.parquet


Database pronto para ser processado.
Agora iremos pegar os valores que nos interessam.
Colunas: 'unidade', 'elemDespesaTCE', 'histórico' e 'idcontrato'

In [5]:
# Open the configuration file and load the different arguments
with open('config.yaml') as f:
    config = yaml.safe_load(f)

print(config)

{'embedding_model': 'sentence-transformers/all-MiniLM-L12-v1', 'embedding_batch_size': 128}


In [2]:
# Load the DataFrame from a Parquet file
df = pd.read_parquet('reassembled_tce.parquet')

In [8]:
# columns
columns = df.columns
print(columns)

Index(['index', 'Analise', 'IdEmpenho', 'Ano', 'Vlr_AnulacaoEmpenho',
       'CdFonteTCE', 'CdFonteUG', 'CNPJRaiz', 'CPFCNPJCredorQtNrs',
       'CPFCNPJCredor', 'Credor', 'DtEmpenho', 'DEFonteTCE', 'DEFonteUG',
       'DEPrograma', 'DEProjAtiv', 'DtAnomes', 'Elemento', 'ElemDespesaTCE',
       'ElemDespesaUG', 'Ente', 'Esfera', 'Funcao', 'Historico', 'IdContrato',
       'IdFonte', 'IdFuncao', 'Id_Orgao', 'IdPrograma', 'IdSubFuncao',
       'IdUnid', 'IdOrgao', 'NrFonte', 'NrFonteUG', 'NrLicitacao',
       'NrProjAtiv', 'NrEmpenho', 'ProgTrab', 'ProgTrabRed', 'ProjAtiv',
       'SubFuncao', 'Tp_Empenho', 'Unidade', 'Vlr_Empenho',
       'Vlr_Anul_Liquidacao', 'Vlr_Liquidacao', 'Vlr_Pagto', 'Vlr_Retencao',
       'Vlr_SubEmpenho', 'Vlr_Empenhado', 'Vlr_Liquidado', 'Vlr_Pago',
       'CGElem', 'CGProgTrab', 'CGigual', 'Cod_Elem', 'Cod_PT', 'CG',
       'CGtitulo', 'CGDesc', 'CGtitTCE', 'CGfreq', 'CGlevel', 'CGpai',
       'CGroot', 'CGchild'],
      dtype='object')


In [3]:
# consultar o tamanho total da base
print("tamanho total da base: ", len(df.iloc[:]))


tamanho total da base:  1484918


Agora vamos carregar o modelo de embedding:

In [6]:
from sentence_transformers import SentenceTransformer

# truncate_dim=256
model = SentenceTransformer(f'{config['embedding_model']}')


In [22]:
historico_teste = df['Historico'][0:384].astype(str).tolist()  # Ensure it's a list of strings


def encode_batch(batch):
    return model.encode(batch)

# Split data into batches
batch_size = 128
batches = [historico_teste[i:i+batch_size] for i in range(0, len(historico_teste), batch_size)]

# Parallel processing with threads
with ThreadPoolExecutor(max_workers=2) as executor:  # Adjust max_workers as needed
    for i, result in enumerate(tqdm(executor.map(encode_batch, batches), total=len(batches))):
        # Convert the result to a Torch tensor
        tensor_embeddings = torch.tensor(result)
        
        # Save the tensor as a .pt file
        torch.save(tensor_embeddings, os.path.join(f"output_embeddings/embeddings_batch_{i}.pt"))


100%|██████████| 3/3 [00:05<00:00,  1.72s/it]
