In [None]:
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.vectorstores.faiss import FAISS
from langchain_community.llms import Ollama
from langchain_community.embeddings import OllamaEmbeddings

In [None]:
import json
import os
import uuid
import copy
from pprint import pp
import datetime

In [None]:
def load_collections_from_disk(collections_path):
    if not os.path.exists(collections_path):
        with open(collections_path, "w") as fp:
            json.dump({}, fp)
    with open(collections_path, "r") as fp:
        collections = json.load(fp)
        for id, coll in collections.items():
            for file in coll["files"]:
                file["last_modified"] = datetime.datetime.fromisoformat(file["last_modified"])
        return collections

def store_collections_to_disk(collections_path, collections):
    collections = copy.deepcopy(collections)
    for id, coll in collections.items():
        for file in coll["files"]:
            file["last_modified"] = file["last_modified"].isoformat()
    with open(collections_path, "w") as fp:
        json.dump(collections, fp)

FILE_ALREADY_EXISTS = "file-exists"

def file_last_modified(path):
    ts = os.path.getmtime(path)
    dt = datetime.datetime.fromtimestamp(ts)
    dtz = dt.astimezone()
    return dtz.astimezone(datetime.timezone.utc)

def file_size(path):
    return os.stat(path).st_size

def add_file_to_collection(collections, coll_id, path):
    if coll_id not in collections:
        raise ValueError("Collection ID does not exist")
    if not os.path.exists(path):
        raise ValueError("File does not exist")
    spec_updated = False
    with open(path, "rb") as new_fp:
        new_bytes = new_fp.read()
        for file in collections[coll_id]["files"]:
            with open(file["path"], "rb") as curr_fp:
                if new_bytes == curr_fp.read():
                    return FILE_ALREADY_EXISTS
                if file["path"] == path:
                    spec_updated = True
                    file["last_modified"] = file_last_modified(path)
                    file["size"] = file_size(path)
                    break
    if not spec_updated:
        collections[coll_id]["files"].append({
            "path": path,
            "last_modified": file_last_modified(path),
            "size": file_size(path),
        })
    generate_embeddings(collections, coll_id)
    return None

def generate_embeddings(collections, coll_id):
    embeddings_path = f"./{coll_id}_embeddings"
    model = collections[coll_id]["embeddings_model"]
    llm = OllamaEmbeddings(model=model)
    coll_chunks = []
    for file in collections[coll_id]["files"]:
        path = file["path"]
        pdf_loader = PyPDFLoader(path)
        chunks = pdf_loader.load_and_split()
        coll_chunks.extend(chunks)
    store = FAISS.from_documents(coll_chunks, llm)
    store.save_local(embeddings_path)
    collections[coll_id]["embeddings"] = embeddings_path
    return None

def create_collection(collections, name, description, embeddings_model):
    identifier = str(uuid.uuid4())
    collections[identifier] = {
        "name": name,
        "description": description,
        "files": [],
        "embeddings": None,
        "embeddings_model": embeddings_model,
    }
    return None

def load_collection(collections, coll_id):
    path = collections[coll_id]["embeddings"]
    if path is None:
        return None
    model = collections[coll_id]["embeddings_model"]
    llm = OllamaEmbeddings(model=model)
    store = FAISS.load_local(path, llm, allow_dangerous_deserialization=True)
    return store

In [None]:
cenace_colls_path = "./cenace-collections.json"

Crear un archivo de colecciones vacío.

In [None]:
store_collections_to_disk(
    cenace_colls_path,
    {},
)

Cargar un archivo de colecciones.

In [None]:
collections = load_collections_from_disk(cenace_colls_path)

In [None]:
collections

Crear una nueva colección.

In [None]:
create_collection(
    collections,
    "LaTeX Prueba",
    "Esta colección contiene dos archivos de LaTeX simples",
    "nomic-embed-text:latest",
)

In [None]:
pp(collections)

Guardamos la especificación con la nueva colección.

In [None]:
store_collections_to_disk(cenace_colls_path, collections)

Agregamos un archivo a la colección.

In [None]:
add_file_to_collection(collections, "cb4f9afa-53cd-4d60-97c7-c2da80524319", "./docs1/foo.pdf")

In [None]:
collections

Guardamos la especificación con la colección modificada.

In [None]:
store_collections_to_disk(cenace_colls_path, collections)

In [None]:
collections = {}

In [None]:
collections = load_collections_from_disk(cenace_colls_path)

In [None]:
collections

In [None]:
db = load_collection(collections, "cb4f9afa-53cd-4d60-97c7-c2da80524319")

In [None]:
db.similarity_search("cadena")

## A trabajar con los documentos del CENACE

In [None]:
collections

In [None]:
create_collection(
    collections, 
    "CENACE demo",
    "Colección con dos documentos para demostrar un RAG a CENACE",
    "nomic-embed-text:latest",
)

In [None]:
pp(collections)

In [None]:
add_file_to_collection(
    collections,
    "244dec7f-90fb-4673-a69a-a7ca01e9ced1",
    "../libretas/docs/PI_C37118_1.1.0.17.pdf",
)

In [None]:
pp(collections)

In [None]:
store_collections_to_disk(cenace_colls_path, collections)

In [None]:
add_file_to_collection(
    collections,
    "d9abed2d-b75b-4074-a558-fbda61ef0840",
    "../libretas/docs/PI-System-Explorer-2018-User-Guide_EN.pdf",
)

In [None]:
pp(collections)

In [None]:
store_collections_to_disk(cenace_colls_path, collections)

In [None]:
db = load_collection(collections, "d9abed2d-b75b-4074-a558-fbda61ef0840")

In [None]:
pp(db.similarity_search("""
What are the principles of operation of The PI C37.118 interface?
"""))

# Borrador

In [None]:
docs1 = {
    "name": "Colección 1",
    "files": [
        {
            "path": "./docs1/foo.pdf",
            "last_modified": datetime.datetime.fromtimestamp(os.path.getmtime("./docs1/foo.pdf")),
            "size": os.stat("./docs1/foo.pdf").st_size,
        }
    ],
    "embeddings": None,
}

In [None]:
path1 = "./docs1/foo.pdf"

In [None]:
file1 = open(path1, "rb")

In [None]:
bytes1 = file1.read()

In [None]:
file1.close()

In [None]:
len(bytes1)

In [None]:
b"Hola, este es un archivo" == bytes1

In [None]:
path2 = "./docs1/bar.pdf"
file2 = open(path2, "rb")
bytes2 = file2.read()
file2.close()

In [None]:
bytes2 == bytes1

In [None]:
import os
import datetime

In [None]:
datetime.datetime.fromtimestamp(os.path.getmtime(path1))

In [None]:
dt = datetime.datetime.fromtimestamp(os.path.getmtime(path2))

In [None]:
dt = dt.astimezone().astimezone(datetime.timezone.utc)

In [None]:
dt.isoformat()

In [None]:
datetime.datetime.fromisoformat('2024-04-19T01:36:54.744574+00:00')

In [None]:
os.stat(path1).st_size

In [None]:
45948 / 1000