In [8]:
%pip install pypdf2
%pip install sentence-transformers
%pip install qdrant-client


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Collecting qdrant-client
  Obtaining dependency information for qdrant-client from https://files.pythonhosted.org/packages/5c/1c/551dac58a8be76820b3eb72b190191337686f75fbee5d738657bca4bd812/qdrant_client-1.9.1-py3-none-any.whl.metadata
  Downloading qdrant_client-1.9.1-py3-none-any.whl.metadata (9.5 kB)
Collecting grpcio>=1.41.0 (from q

In [10]:
# %%
import copy
import os
import base64

import openai
import PyPDF2
import textwrap
from io import BytesIO

from burr.core import Application, ApplicationBuilder, State, Result
from burr.core.action import action, default, expr, when
from burr.lifecycle import LifecycleAdapter
from burr.tracking import LocalTrackingClient

# %%
from pydantic import BaseModel, Field


class InputType(BaseModel):
    prompt: str | None = None
    document: bytes| str | None = Field(default=None, description="Binary PDF content")

# %%
@action(reads=[], writes=["prompt", "document_ingest"])
def process_input(state: State, input: InputType) -> tuple[dict, State]:
    # TODO: raise and exception if input is invalid

    return (
        {"prompt": input.prompt, "document_ingest": input.document},
        state.update(prompt=input.prompt, document_ingest=input.document),
    )

# %%
@action(reads=["prompt"], writes=["vector_results"])
def query_vector(state: State) -> tuple[dict, State]:
    vector_results = []
    return {"vector_results": vector_results}, state.update(
        vector_results=vector_results
    )


@action(reads=["prompt", "vector_results"], writes=["full_prompt"])
def generate_prompt(state: State) -> tuple[dict, State]:
    prompt = state["prompt"]
    vector_results = state["vector_results"]

    full_prompt = prompt + str(vector_results)
    return {"full_prompt": full_prompt}, state.update(full_prompt=full_prompt)


def _get_openai_client():
    return openai.Client()


@action(reads=["full_prompt"], writes=["response"])
def prompt_response(
    state: State,
    model: str = "gpt-4o",
) -> tuple[dict, State]:
    client = _get_openai_client()
    prompt = state["full_prompt"]
    result = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt},
        ],
    )
    response = result.choices[0].message.content
    return {"response": response}, state.update(response=response)

# %%
import uuid
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer

from qdrant_client import QdrantClient
from qdrant_client.http import models

embeddings_model = SentenceTransformer(
    os.environ.get("embeddings_model") or "all-MiniLM-L6-v2"
)
embeddings_dimension = 384


@action(reads=["document_ingest"], writes=["chunk_document"])
def chunk_document(state: State) -> tuple[dict, State]:
    document_base64 = state["document_ingest"]
    document_bytes = base64.b64decode(document_base64)
    # Read PDF content
    pdf_reader = PyPDF2.PdfReader(BytesIO(document_bytes))
    pdf_text = ""
    for page in pdf_reader.pages:
        pdf_text += page.extract_text()
    
    # Initialize the text splitter
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
    
    # Split the text into chunks
    chunks = text_splitter.split_text(pdf_text)
    
    # Update state with chunks
    return {"chunk_document" : chunks}, state.update(chunk_document=chunks)

@action(reads=["chunk_document"], writes=["doc_embeddings"])
def get_embeddings(state: State) -> tuple[dict, State]:
    chunks = state["chunk_document"]

    embeddings_model = SentenceTransformer(os.environ.get("embeddings_model") or "all-MiniLM-L6-v2")
    
    points = []
    for doc_id, chunk in enumerate(chunks):
        chunk_id = uuid.uuid5(uuid.NAMESPACE_DNS, f"{doc_id}_1")

        # TODO: Fix this so that the vector output is of the format PointStruct expects
        vector = embeddings_model.encode([chunk])[0]
        vector = vector.tolist()

        points.append(
            models.PointStruct(
                id=str(chunk_id),
                payload={
                    "metadata": {
                        # "tenant_id": app_config.app_id,
                        # "title": chunk.metadata["title"],
                        # "source": chunk.metadata["source"],
                        "chunk_id": chunk_id,
                        "doc_id": doc_id,
                    },
                    "content": chunk,
                },
                vector=vector,
            )
        )
    # Update state with embeddings
    return {"doc_embeddings" : points}, state.update(doc_embeddings=points)


@action(reads=["doc_embeddings"], writes=["db_success"])
def embeddings_to_db(state: State) -> tuple[dict, State]:
    embeddings = state["doc_embeddings"]
    
    # Initialize Qdrant client
    qdrant_host = os.environ.get("QDRANT_HOST", "localhost") # 49.13.162.67
    qdrant_port = int(os.environ.get("QDRANT_PORT", "6333"))
    client = QdrantClient(host=qdrant_host, port=qdrant_port)
    
    # Define the collection name
    collection_name = os.environ.get("QDRANT_COLLECTION", "embeddings_collection")
    if not client.collection_exists("{collection_name}"):
        client.create_collection(
            collection_name="{collection_name}",
            vectors_config=models.VectorParams(size=embeddings_dimension, distance=models.Distance.COSINE),
        )

    # Insert points into the collection
    response = client.upsert(
        collection_name="{collection_name}",
        points=embeddings,
    )
    
    # Update state with success status
    return {"db_success" : response}, state.update(db_success=response)


@action(reads=["db_success"], writes=["db_response"])
def ingest_response(state: State) -> tuple[dict, State]:
    db_success = state["db_success"]
    
    if db_success.status == "completed":
        message = "Embeddings successfully ingested into Qdrant."
    else:
        message = "Failed to ingest embeddings into Qdrant."
    
    return {"db_response" : message}, state.update(db_response=message)

# %%
app = (ApplicationBuilder()
       .with_actions(process_input, query_vector, generate_prompt, prompt_response, chunk_document, get_embeddings, embeddings_to_db, ingest_response)
       .with_transitions(
        #    Furst chain
           ("process_input", "query_vector", expr("prompt is not None")),
           ("query_vector", "generate_prompt"),
           ("generate_prompt", "prompt_response"),
        #    Second chain
           ("process_input", "chunk_document", expr("document_ingest is not None")),
           ("chunk_document", "get_embeddings"),
           ("get_embeddings", "embeddings_to_db"),
           ("embeddings_to_db", "ingest_response"),
       )
       .with_entrypoint("process_input")
       .with_tracker(project="pfd_rag")
       .build()
)

model.safetensors:  58%|#####7    | 52.4M/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [13]:

# %%
# Read the PDF file and create the document input
with open("/Users/elijahbenizzy/Downloads/smoke_co_detectors.pdf", "rb") as f:
    pdf_content = f.read()

# Encode the PDF content to base64
pdf_base64 = base64.b64encode(pdf_content).decode("utf-8")

# Create an instance of InputType with the PDF content
doc = InputType(prompt=None, document=pdf_base64)
# doc = InputType(prompt="What is CRM?", document=None)

app.run(inputs={"input": doc}, halt_after=["prompt_response", "ingest_response"])

# %%
app.visualize(include_conditions=True, include_state=True, format="png")



********************************************************************************
-------------------------------------------------------------------
Oh no an error! Need help with Burr?
Join our discord and ask for help! https://discord.gg/4FxBMyzW5n
-------------------------------------------------------------------
> Action: `embeddings_to_db` encountered an error!<
> State (at time of action):
{'__PRIOR_STEP': 'get_embeddings',
 '__SEQUENCE_ID': 3,
 'chunk_document': "['AOA Form No. 1 10 (Rev. 10/13 ) - Copyright 2010...",
 'doc_embeddings': "[PointStruct(id='29d77b6e-893a-51b8-916e-fa0c963b4...",
 'document_ingest': "'JVBERi0xLjMKJcTl8uXrp/Og0MTGCjQgMCBvYmoKPDwgL0Zpb...",
 'prompt': None}
> Inputs (at time of action):
{'input': "InputType(prompt=None, document='JVBERi0xLjMKJcTl8..."}
********************************************************************************
Traceback (most recent call last):
  File "/Users/elijahbenizzy/.pyenv/versions/3.11/envs/burr-3-11/lib/python3.11/site

ResponseHandlingException: [Errno 61] Connection refused