In [None]:
#!/usr/bin/env python
import os
from pathlib import Path
import csv
import shutil

from langchain.document_loaders import TextLoader
from docling.document_converter import DocumentConverter
# from google.colab import files

class LoadingExtraction:
    def __init__(self, chemin_acces, formats, output_dir):
        self.chemin_acces = chemin_acces
        self.formats = formats
        self.output_dir = Path(output_dir)

    def obtenir_chemins_acces(self, extensions=None):
        return [
            os.path.join(root, file)
            for root, _, files in os.walk(self.chemin_acces)
            for file in files
            if not extensions or file.lower().endswith(tuple(extensions))
        ]

    def filtrer_par_format(self, chemins):
        return {fmt: [path for path in chemins if path.lower().endswith(f".{fmt}")] for fmt in self.formats}

    def charger_fichier_txt(self, chemins):
        for path in chemins:
            try:
                with open(path, 'r', encoding='utf-8') as source_file:
                    filename = os.path.basename(path)
                    dest_path = self.output_dir / filename
                    with open(dest_path, 'w', encoding='utf-8') as dest_file:
                        dest_file.write(source_file.read())
            except Exception as e:
                print(f"Erreur lors du traitement du fichier TXT {path}: {e}")

    def charger_fichier_csv(self, chemins):
        for path in chemins:
            try:
                filename = os.path.basename(path).replace('.csv', '.txt')
                dest_path = self.output_dir / filename
                with open(path, 'r') as fichier_csv, open(dest_path, 'w') as fichier_txt:
                    lecteur_csv = csv.reader(fichier_csv)
                    for ligne in lecteur_csv:
                        fichier_txt.write('\t'.join(ligne) + '\n')
            except Exception as e:
                print(f"Erreur lors du traitement du fichier CSV {path}: {e}")

    def charger_fichier_pdf_html(self, chemins, format_type):
        for path in chemins:
            try:
                filename = os.path.basename(path)
                converter = DocumentConverter()
                result = converter.convert(path)
                dest_path = self.output_dir / f"{filename}.md"
                with dest_path.open("w", encoding="utf-8") as fp:
                    fp.write(result.document.export_to_markdown())
            except Exception as e:
                print(f"Erreur lors du traitement du fichier {format_type.upper()} {path}: {e}")

    def ajouter_extension_txt(self):
        for filename in self.output_dir.iterdir():
            if filename.suffix == '.md':
                new_filename = filename.with_suffix('.md.txt')
                filename.rename(new_filename)
                print(f"Renommé: {filename} -> {new_filename}")

    def zipper_et_telecharger(self, zip_name="scratch"):
        zip_path = f"{zip_name}.zip"
        if not os.path.exists(zip_path):  # Ne compresse et ne télécharge que si le fichier n'existe pas
            shutil.make_archive(zip_name, 'zip', str(self.output_dir))
            # files.download(zip_path)
        else:
            print(f"Le fichier compressé '{zip_path}' existe déjà. Aucun téléchargement nécessaire.")

    def pipeline(self):
        if self.output_dir.exists():
            print(f"Le répertoire '{self.output_dir}' existe déjà. Aucune action supplémentaire requise.")
            return

        # Préparation
        chemins = self.obtenir_chemins_acces()
        fichiers_par_format = self.filtrer_par_format(chemins)

        # Créer le dossier de sortie
        self.output_dir.mkdir(parents=True, exist_ok=True)

        # Charger les fichiers
        self.charger_fichier_txt(fichiers_par_format.get('txt', []))
        self.charger_fichier_csv(fichiers_par_format.get('csv', []))
        self.charger_fichier_pdf_html(fichiers_par_format.get('pdf', []), "pdf")
        self.charger_fichier_pdf_html(fichiers_par_format.get('html', []), "html")

        # Ajouter extension .txt et compresser
        self.ajouter_extension_txt()
        self.zipper_et_telecharger()

# Exécution
if __name__ == "__main__":
    chemin_acces = os.getcwd() + "/data"  # À remplacer
    formats_supportés = ["txt", "csv", "pdf", "html"]
    output_dir = "scratch"

    loader = LoadingExtraction(chemin_acces, formats_supportés, output_dir)
    loader.pipeline()

In [None]:
class DocumentProcessor:
    def __init__(self, chunk_size=1024, chunk_overlap=50):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def split_text(self, text):
     #Découper le texte en chunks
        chunks = []
        start = 0
        while start < len(text):
            end = min(start + self.chunk_size, len(text))
            chunks.append(text[start:end])
            start += self.chunk_size - self.chunk_overlap  # Décalage avec chevauchement
        return chunks

    def process_file(self, file_path):
        """
        Lit un fichier, le découpe en chunks, et ajoute des métadonnées.
        """
        with open(file_path, 'r', encoding='utf-8') as file:
            text = file.read()

        # Créer des métadonnées
        metadata = {
            "document_name": os.path.basename(file_path),
            "path": file_path,
            "source_type": "txt"
        }

        # Découper le texte et associer les métadonnées
        chunks = self.split_text(text)
        processed_chunks = [{"content": chunk, "metadata": metadata} for chunk in chunks]

        return processed_chunks

    def process_folder(self, folder_path):
        """
        Parcourt un dossier, traite chaque fichier .txt, et renvoie une liste de chunks.
        """
        all_chunks = []
        for filename in os.listdir(folder_path):
            if filename.endswith(".txt"):
                file_path = os.path.join(folder_path, filename)
                chunks = self.process_file(file_path)
                all_chunks.extend(chunks)  # Ajouter tous les chunks à la liste globale
        return all_chunks


# Exemple d'utilisation
if __name__ == "__main__":
    folder_path = "/content/scratch"
    processor = DocumentProcessor()

    # Traiter tous les fichiers dans le dossier
    chunks = processor.process_folder(folder_path)

In [None]:
# Check if GPU is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print("GPU not available, using CPU.")

In [None]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('dangvantuan/french-document-embedding', trust_remote_code=True, device=device)
embeddings = []
for chunk in chunks:
  sentences = chunk['content']
  embedding = model.encode(sentences,device=device) # Use GPU for encoding)
  embeddings.append(embedding)

In [None]:
# Convertir les embeddings en NumPy float32 pour FAISS
import numpy as np
embeddings = np.array(embeddings, dtype="float32")

import faiss

# Définir la dimension des vecteurs
dimension = embeddings.shape[1]

# Créer un index FAISS avec distance L2
index = faiss.IndexFlatL2(dimension)

# Ajouter les embeddings dans l'index
index.add(embeddings)

# Exemple de métadonnées associées à chaque chunk
#metadata = [{"id": i, "content": chunk} for i, chunk in enumerate(chunks)]
metadata = [{"id": i, "content": chunk['content'], "metadata": chunk["metadata"]} for i, chunk in enumerate(chunks)]
#metadata = [item["metadata"] for item in chunks]

# Sauvegarder les métadonnées parallèlement
metadata_store = {i: meta for i, meta in enumerate(metadata)}

print(f"Nombre total de vecteurs dans l'index : {index.ntotal}")

# Associer les indices aux chunks
id_to_chunk = {i: chunk for i, chunk in enumerate(chunks)}

In [None]:
from langchain_community.vectorstores import FAISS

vector_store = FAISS(
    embedding_function=None,
    index=index,
    docstore=chunks,
    index_to_docstore_id=id_to_chunk
    )

In [None]:
# 6. Effectuer une recherche

query_vector = "quel sont les codes de travail en benin"

# Convert the query to an embedding using the SentenceTransformer model
query_embedding = model.encode(query_vector)

# Reshape to a 2D array with a single row
query_embedding = query_embedding.reshape(1, -1).astype("float32")

# Use query_embedding instead of query_vector for search
distances, indices = index.search(query_embedding, 1)  # Trouver le plus proche

# Get the index of the closest chunk
closest_chunk_index = indices[0][0]

# Access the metadata using the index from metadata_store
result = metadata_store[closest_chunk_index]