In [24]:
import os
import json
import pandas as pd
import itertools
from typing import List, Dict, Any
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.http import models
from dotenv import load_dotenv

# Diretórios principais
BASE_DIR = os.getcwd()
DIR_PAI = os.path.dirname(BASE_DIR)
DIR_SRC = os.path.join(DIR_PAI, 'src')
DIR_VECTORSTORE = os.path.join(DIR_SRC, 'vector_store') 
DIR_DATA = os.path.join(DIR_PAI, "data")
DIR_DATA_RAW = os.path.join(DIR_DATA, "raw")
DIR_LOGS = os.path.join(DIR_DATA, "logs")
DIR_DATA_REFINEMENT = os.path.join(DIR_DATA, "outputs_vision_and_extractor")

class CollectionCreator:
    def __init__(self, config_file: str):
        self.config = self.load_config(config_file)
        load_dotenv()
        self.qdrant_url = os.getenv("QDRANT_URL")
        self.qdrant_api_key = os.getenv("QDRANT_API_KEY")

        if not self.qdrant_url or not self.qdrant_api_key:
            raise ValueError("Variáveis QDRANT_URL ou QDRANT_API_KEY não encontradas!")

        self.file_to_metadata = self.load_metadata_from_csv()

        # Conexão com o cliente Qdrant
        self.qdrant_client = QdrantClient(
            url=self.qdrant_url,
            api_key=self.qdrant_api_key
        )

    def load_config(self, config_file: str) -> Dict[str, Any]:
        """Carrega o arquivo de configuração JSON."""
        with open(config_file, 'r', encoding='utf-8') as f:
            return json.load(f)

    def load_metadata_from_csv(self) -> Dict[str, Dict[str, Any]]:
        """Carrega o CSV e cria um mapeamento arquivo_id -> metadados."""
        csv_path = os.path.join(DIR_DATA, "subcategoria_name.csv")
        df = pd.read_csv(csv_path)
        metadata_dict = df.set_index("arquivo_id").to_dict(orient="index")
        print("Metadados carregados do CSV:", metadata_dict)
        return metadata_dict

    def load_json_documents(self, file_id: str) -> List[Document]:
        """Carrega arquivos JSON, adiciona metadados do CSV e páginas."""
        dir_path = os.path.join(self.config['pdf_dir'], file_id)

        if not os.path.isdir(dir_path):
            print(f"Diretório {dir_path} não existe. Pulando.")
            return []

        documents = []
        for fname in os.listdir(dir_path):
            if fname.endswith("_resultado.json"):
                fpath = os.path.join(dir_path, fname)
                with open(fpath, 'r', encoding='utf-8') as f:
                    data = json.load(f)

                text = data.get("unified_analysis", "")
                page_number = data.get("page", None)
                metadata = {"source": fname, "pag": page_number}

                try:
                    arquivo_id = int(fname.split("_")[1])
                    metadata["arquivo_id"] = arquivo_id
                    if arquivo_id in self.file_to_metadata:
                        csv_metadata = self.file_to_metadata[arquivo_id]
                        metadata.update(csv_metadata)
                except (IndexError, ValueError):
                    print(f"Erro ao extrair arquivo_id de {fname}")

                # Mover arquivo_id para o nível superior do payload
                doc_payload = metadata.copy()
                if "arquivo_id" in metadata:
                    doc_payload["arquivo_id"] = metadata.pop("arquivo_id")

                documents.append(Document(page_content=text, metadata=doc_payload))

        return documents

    def clean_metadata(self, metadata: dict) -> dict:
        """Garante que o metadata seja serializável e limpo."""
        return {k: v for k, v in metadata.items() if isinstance(v, (str, int, float, bool, list, dict))}

    def create_collection_and_index(self, collection_config: Dict[str, Any]):
        """Cria a coleção e o índice no Qdrant antes da inserção de dados."""
        collection_name = collection_config['collection_name']
        embeddings_model = collection_config['embeddings_model']
        local_embeddings = HuggingFaceEmbeddings(model_name=embeddings_model, model_kwargs={'trust_remote_code': True})

        # Criar coleção se não existir
        self.qdrant_client.create_collection(
            collection_name=collection_name,
            vectors_config=models.VectorParams(size=1024, distance=models.Distance.COSINE),
            on_disk_payload=True  # Armazena payloads no disco
        )

        # Criar índice para os metadados
        for field_name, field_type in [("arquivo_id", models.PayloadSchemaType.KEYWORD)]:
            try:
                self.qdrant_client.create_payload_index(
                    collection_name=collection_name,
                    field_name=field_name,
                    field_schema=field_type
                )
                print(f"Índice criado para o campo '{field_name}' na coleção '{collection_name}'.")
            except Exception as e:
                print(f"Erro ao criar índice '{field_name}': {e}")

        # Processar documentos
        documents = collection_config['documents']
        chunk_size = collection_config['chunk_size']
        chunk_overlap = collection_config['chunk_overlap']

        if chunk_size == "Page":
            splits = documents
        else:
            text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
            splits = text_splitter.split_documents(documents)

        for doc in splits:
            doc.metadata = self.clean_metadata(doc.metadata)

        QdrantVectorStore.from_documents(
            documents=splits,
            embedding=local_embeddings,
            url=self.qdrant_url,
            api_key=self.qdrant_api_key,
            collection_name=collection_name,
            force_recreate=False
        )

        print(f"Collection {collection_name} criada com sucesso e dados indexados.")

    def generate_collection_configs(self, documents_dict: Dict[str, List[Document]]) -> List[Dict[str, Any]]:
        embeddings_models = self.config['embeddings_models']
        chunk_sizes = self.config['chunk_sizes']
        chunk_overlaps = self.config['chunk_overlaps']

        all_docs = []
        for docs in documents_dict.values():
            all_docs.extend(docs)

        combos = itertools.product(embeddings_models, chunk_sizes, chunk_overlaps)
        configs = []
        for emb_model, c_size, c_overlap in combos:
            chunk_display = c_size if c_size != "Page" else "by-page"
            collection_name = f"{self.config['collection_name']}_chunk{chunk_display}_overlap{c_overlap}_{emb_model.split('/')[-1]}"
            extraction_config = {
                "collection_name": collection_name,
                "chunk_size": c_size,
                "chunk_overlap": c_overlap,
                "embeddings_model": emb_model,
                "documents": all_docs,
            }
            configs.append(extraction_config)

        return configs

    def create_collections(self):
        documents_dict = {}
        for arquivo_id in self.config['arquivo_ids_to_process']:
            if not arquivo_id.startswith("fluidos_"):
                continue
            docs = self.load_json_documents(arquivo_id)
            if docs:
                documents_dict[arquivo_id] = docs

        self.collection_configs = self.generate_collection_configs(documents_dict)

        for collection_config in self.collection_configs:
            self.create_collection_and_index(collection_config)


In [25]:
if __name__ == "__main__":
    config_file = os.path.join(DIR_VECTORSTORE, "config.json")
    processor = CollectionCreator(config_file)
    processor.create_collections()

Metadados carregados do CSV: {11484: {'subcategoria_nome': 'fluidos'}, 11640: {'subcategoria_nome': 'fluidos'}, 13271: {'subcategoria_nome': 'fluidos'}, 13417: {'subcategoria_nome': 'fluidos'}, 13472: {'subcategoria_nome': 'fluidos'}, 13572: {'subcategoria_nome': 'fluidos'}, 13852: {'subcategoria_nome': 'fluidos'}, 4802: {'subcategoria_nome': 'fluidos'}}
Índice criado para o campo 'arquivo_id' na coleção 'fluidos_chunkby-page_overlap100_multilingual-e5-large'.
Collection fluidos_chunkby-page_overlap100_multilingual-e5-large criada com sucesso e dados indexados.
Índice criado para o campo 'arquivo_id' na coleção 'fluidos_chunkby-page_overlap0_multilingual-e5-large'.
Collection fluidos_chunkby-page_overlap0_multilingual-e5-large criada com sucesso e dados indexados.
