In [39]:
import chromadb
import re
import os
import uuid
import hashlib
from datetime import datetime
from chromadb import Settings
from typing import List, Dict
from IPython.display import clear_output
from langchain_text_splitters import RecursiveCharacterTextSplitter

In [40]:
DATA_PATH = "./raw_database/"

In [41]:
chroma_client = chromadb.PersistentClient(
    path="./persistent_db",
    settings= chromadb.config.Settings(allow_reset=True)
)

In [45]:
collection = chroma_client.get_or_create_collection(
    name="cv_doc",
    metadata= {"hnsw:space" :"cosine"}
)

In [46]:
print(collection)

Collection(name=cv_doc)


In [44]:
results = collection.query(
    query_texts=["Que proyectos ha desarrollado Martin?"],
    n_results=6,
    include = ["documents"]
)

In [19]:
print(results)

{'ids': [['id1', 'id6', 'id3', 'id2', 'id5', 'id4']], 'embeddings': None, 'documents': [['Encantado, mi nombre es Martín y soy un desarrollador de Software con 2 años de experiencia en el sector de la tecnología', 'En el 2024 he compleado y conseguido mis titulos en Desarrollo aplicaciones multiplataforma', 'Mi último puesto de trabajo ha sido en el CESGA (Centro de supercomputación de Galicia, en el que he creado un chatbot)', 'Proyecto sobre el teorema del mono infinito que habla que un mono con tiempo infinito en algun momento escribirá shakespeare', 'Tengo estudios de desarrollador de aplicaciones multiplataforma, desarrollo y diseño web, gestión de ventas y marketing', 'Mis últimos proyectos han sido: desarrollo de un chatbot empleando inteligencia artificial, un proyecto sobre el teorema del mono infinito, una aplicación de ciudadanía móvil']], 'uris': None, 'data': None, 'metadatas': None, 'distances': None, 'included': [<IncludeEnum.documents: 'documents'>]}


In [53]:
def parse_markdown_text(document_name: str, document_data: str) -> Dict:
    # Extract the data from the differents sections of the document
    title_match = re.search(r"% title:\s*(.*)", document_data)
    document_match = re.search(r"% document: \s*(.*)", document_data)
    document_match = re.search(r"% document: \s*(.*)", document_data)
    url_source_match = re.search(r"% url_source:\s*(.*)", document_data)
    web_source_match = re.search(r"% web_source:\s*(.*)", document_data)
    date_publication_match = re.search(r"% date_publication:\s*(.*)", document_data)
    content_match = re.search(r"% content:\s*(.*)", document_data, re.DOTALL)
    
    #Converts the text into a json object
    parsed_document = {
        "title" : title_match.group(1).strip() if title_match else "Unknow title document",
        "document" : document_match.group(1).strip() if document_match else "No document data",
        "document_name" : document_name,
        "url_source" : url_source_match.group(1).strip() if url_source_match else "https://github.com/Martingago",
        "web_source" :  web_source_match.group(1).strip() if web_source_match else "https://martingago.dev/",
        "date_publication" : date_publication_match.group(1).strip() if date_publication_match else "No date",
        "content": content_match.group(1).strip() if content_match else ""
    }
    return parsed_document

### Splits the document into small chunks

In [54]:
def split_text_from_string(document: str) -> List[str]:
    text_splitter = RecursiveCharacterTextSplitter(
    chunk_size = 300,
    chunk_overlap = 10,
    length_function=len,
    is_separator_regex=False
    )
    splitted_text = text_splitter.split_text(document)
    return splitted_text

In [22]:
def create_chunks_from_string(file_path: str) -> List[Dict] :
    #Reads the doc
    with open(file_path ,'r', encoding='utf-8') as f:
        document_data = f.read()
        document_name = os.path.splitext(os.path.basename(file_path))[0]
    
    parsed_doc = parse_markdown_text(document_name, document_data)
    content_chunks = split_text_from_string(parsed_doc["content"])

    chunk_with_metadata = []

    for chunk in content_chunks:
        hash_object = hashlib.sha256(chunk.encode('utf-8')) #generates a hash in base of content to avoid duplicates
        chunk_id = hash_object.hexdigest()
        
        chunk_metadata = {
            "document" :  parsed_doc["document"],
            "document_name" : parsed_doc["document_name"],
            "title" : parsed_doc["title"],
            "url_source" : parsed_doc["url_source"],
            "web_source" :  parsed_doc["web_source"],
            "date_publication" : parsed_doc["date_publication"]
        }
        chunk_data = {
            "id": "ID"+chunk_id,
            "document": chunk,
            "metadata": chunk_metadata
        }
        chunk_with_metadata.append(chunk_data)
    return chunk_with_metadata

In [55]:
def upload_chunks_to_collection(document_chunks, collection):
    
    for chunk in document_chunks:
        # Validates if a documents exists on the database or not
        item = collection.get(ids=[chunk["id"]])
        if not len(item["ids"]) :
            collection.add(
                ids =[chunk["id"]],
                documents = [chunk["document"]],
                metadatas = [chunk["metadata"]]
            )
            print(f"    >Chunk: {chunk['id']} successfully added")
        else : 
            print("Duplicate entry, skipping")

### Inserts a single file into the database

In [56]:
def insert_file_to_database(file_path: str) :
    print("executing")
    document_chunks = create_chunks_from_string(file_path) # Generates an array of chunks from a file path
    upload_chunks_to_collection(document_chunks, collection) # upload the chunks to the  specified collection

## Insert all files from a dir into the selected collection

In [57]:
def handle_folders_upload(input_directory : str, collection) :
    for root, dirs, files in os.walk(input_directory):
        dirs[:] = [d for d in dirs if not d.startswith('.')]
        for file in files:
            if file.endswith('.md') and not file.startswith('.'):
                file_path = os.path.join(root, file)
                time_start = datetime.now()
                insert_file_to_database(file_path) # handle the document path and insert data into database
                time_end = datetime.now()
                execution_time = (time_end - time_start).total_seconds() * 1000
                #clear_output(wait=True)
                print(f"[{execution_time:.0f}ms]: {file} transformed into chunks and inserted into:{collection.name}")

In [58]:
handle_folders_upload(DATA_PATH, collection)

executing
{'title': 'Connect4 - Versión online del clásico juego conecta 4.', 'document': 'Readme proyecto', 'document_name': 'connect4', 'url_source': 'https://github.com/Martingago/connect4', 'web_source': 'https://estrategia4.netlify.app/', 'date_publication': '10-08-2022', 'content': '# Connect4 - Versión Online del Clásico Juego Conecta 4\n\n**Connect4** es una versión interactiva y personalizable del clásico juego **Conecta 4**, desarrollada con HTML, CSS, JavaScript y **Bootstrap**. El juego está diseñado para ser fácil de usar, divertido y visualmente atractivo, permitiendo partidas entre dos jugadores.\n\n![Connect4 Screenshot](https://github.com/Martingago/connect4/blob/main/img/global/connect4.png)\n\n## 🚀 Características Principales\n\n- **🎨 Personalización de Avatares**  \n  Dale un toque único a tu perfil seleccionando avatares personalizados para cada jugador.\n\n- **🔍 Partidas Normales y Personalizadas**  \n  Puedes jugar partidas rápidas o configurar tus propias reglas