## Setting up the environment

* load the env from .env file
* setup the connection with rabbitMQ

In [None]:

import os
from dotenv import load_dotenv

load_dotenv(dotenv_path="../../.env")


RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
LANGSMITH_TRACING = os.environ.get("LANGSMITH_TRACING")
LANGSMITH_API_KEY = os.environ.get("LANGSMITH_API_KEY")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")



## Receiving the msg from the queue

Open a connection to a rabbitMQ parse-file queue and get the results , process the result and store them into Reddis for temp storage

In [None]:
import os , sys , json , requests
import pika
from io import BytesIO
from PyPDF2 import  PdfReader
from langchain_community.document_loaders import PyPDFLoader

RABBITMQ_URL =os.getenv('RABBITMQ_URL')
QUEUE = "parse-files"

# Establish connection to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=QUEUE, durable=True)
print("[*] Connected to RabbitMQ ")

[*] Connected to RabbitMQ 


### Embeddings

The idea here is to store numneric vecotrs that are associated with the text given a query we can embed it as a vector of the same dimension and user vector similarity metrics to identify related text


In [None]:
from langchain_openai import  OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

In [None]:
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter

vector_store = InMemoryVectorStore(embeddings)

### Loading Documents 

Get the JOB from the rabbitMQ 
```
{
  "jobId": "b23557ab-e8d0-4f04-96e6-dbfa42aec980",
  "files": [
    {
      "url": "https://res.cloudinary.com/dzfbs8rep/image/upload/v1757932874/claims/eavjy7iks164ptd6ndgq.pdf",
      "public_id": "claims/eavjy7iks164ptd6ndgq",
      "bytes": 106362,
      "format": "pdf"
    }
  ]
}
```

fetch the url downlaod the pdf parse it and store it into a in memory vector store

In [None]:
def process_job(body: bytes):
    """Decode JSON, download multiple PDFs, extract text, store with metadata"""
    job = json.loads(body.decode("utf-8"))
    job_id = job["jobId"]

    print(f"\n[*] Job Received: {job_id}")

    for idx, f in enumerate(job["files"], start=1):
        url = f["url"]
        fmt = f.get("format", "").lower()
        name = f.get("public_id", f"file{idx}")

        print(f" [*] downloading {fmt} from {url}")
        resp = requests.get(url, timeout=30)
        resp.raise_for_status()

        if fmt == "pdf":
            reader = PdfReader(BytesIO(resp.content))

            # Extract metadata
            meta = reader.metadata or {}
            num_pages = len(reader.pages)

            # Turn into a serializable dict
            meta_data = {
                "author": meta.get("/Author"),
                "title": meta.get("/Title"),
                "subject": meta.get("/Subject"),
                "creator": meta.get("/Creator"),
                "producer": meta.get("/Producer"),
                "created": str(meta.get("/CreationDate")),
                "modified": str(meta.get("/ModDate")),
                "pages": num_pages,
                "source_url": url,
                "public_id": name,
            }

            # Extract text
            text = "\n".join(page.extract_text() or "" for page in reader.pages)

            print(f" [*] {name}: extracted {len(text)} characters, {num_pages} pages")
        else:
            meta_data = {"source_url": url, "public_id": name, "format": fmt}
            text = f"[Unsupported format: {fmt}]"
            print(f" [*] Unsupported format: {fmt}")

        splitter = RecursiveCharacterTextSplitter(chunk_size=800,
                                                  chunk_overlap=200)
        chunks = splitter.split_text(text)
        document_ids = vector_store.add_texts(texts=chunks)
        print(f"{len(chunks)}") 

        print(document_ids[idx])


Callback function - listen to the queue process the job and stop consuming after getting 1 job 

In [None]:
def callback(ch, method, properties, body):
    """Callback for messages from RabbitMQ"""
    try:
        process_job(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
        ch.stop_consuming()
    except Exception as e:
        print(" [*] Error processing:", e)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)


In [None]:
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=QUEUE, on_message_callback=callback)

try:
    channel.start_consuming()
    print(" [*] Consumer stopped. Closing connection.")
    connection.close()
except KeyboardInterrupt:
    print("Interrupted by user, closing connection...")
    channel.stop_consuming()
    connection.close()
    sys.exit(0)


[*] Job Received: 11ac6f16-1624-40a9-81ba-38c4edca1352
 [*] downloading pdf from https://res.cloudinary.com/dzfbs8rep/image/upload/v1757937705/claims/z3dm6odsjfskmiait07j.pdf
 [*] claims/z3dm6odsjfskmiait07j: extracted 1186 characters, 1 pages
2
56a39c90-21ec-464a-872c-1d002b44d74f
 [*] Consumer stopped. Closing connection.


Test : Checking the seimilarity sementic search is working properly

In [None]:
results = vector_store.similarity_search(
    "Knee surgery"
)

print(results[0])

page_content='Health Insurance Policy Document
Policy Holder Information:
Name: John Doe
Age: 46
Gender: Male
Address: 123, Green Park, Pune, Maharashtra, India
Policy Number: HIP-2025-000123
Policy Start Date: 2025-01-01
Policy Duration: 1 year
Coverage Clauses:
Clause 1.1: The policy covers medical expenses related to hospitalization due to accidents, illnesses, and surgeries.
Clause 1.2: Outpatient consultations are covered up to ■10,000 annually.
Clause 2.3: Coverage applicable for policyholders aged between 18 and 65 years.
Clause 3.2: Knee surgery is covered if the policy has been active for at least 90 days.
Clause 3.5: Cardiac procedures are covered if recommended by a certified specialist.
Clause 4.1: Policies must be valid and active in the location where the medical procedure is performed.'


## RAG Implementation using Langchain

Using langChain RAG implementation to get back the JSON result we want
{ decision, amount, justification, clauses_used }

- Index parsed text into vector DB.

- At query time: embed user query, retrieve most relevant clauses.

- Pass into LLM with system+user prompts.

- Get a structured JSON decision.

In [None]:
from typing import List, TypedDict
from langchain_core.documents import Document
from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START

# Initialize the LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

In [None]:
from langchain.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template(
    """
You are an expert Document Analysis AI specialized in interpreting insurance policy documents. Your job is to process an input query and output a structured JSON object with a clear decision, based on the policy clauses.

The expected JSON output format is:

{{
  "name": "Full Name",
  "age" : "age"
  "decision": "approved" | "rejected",
  "amount": 50000,
  "justification": "Detailed explanation with clause numbers and reasoning."
}}

Rules:
1. Parse the input query to extract key details: age, procedure, location, policy duration.
2. Perform semantic search over the provided policy document to retrieve relevant clauses.
3. Evaluate the clauses and decide whether the claim is approved or rejected.
4. If approved, set the correct payout amount.
5. Provide a detailed justification mentioning clause numbers and reasoning.
6. Output only the valid JSON object (no extra text).

Context (policy document):
{context}

Query:
{question}
"""
)


In [None]:
class State(TypedDict):
    question: str
    context: List[Document]
    answer: str

def retrieve(state: State):
    retrieved_docs = vector_store.similarity_search(state["question"])
    return {"context": retrieved_docs}

def generate(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    messages = prompt.invoke({"question": state["question"], "context": docs_content})
    response = llm.invoke(messages)  
    return {"answer": response.content}

In [None]:
graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()

In [None]:
response = graph.invoke({
    "question": "46M, knee surgery, Pune, activated for 3-month policy",
    "context": document_ids
})
print(response["answer"])

{
  "name": "John Doe",
  "age": "46",
  "decision": "approved",
  "amount": 50000,
  "justification": "The claim for knee surgery is approved based on Clause 1.1, which covers medical expenses related to surgeries. The policyholder is within the age limit specified in Clause 2.3 (aged between 18 and 65 years). Although the policy has only been active for 3 months, Clause 3.2 states that knee surgery is covered if the policy has been active for at least 90 days, which is satisfied. The location of the procedure (Pune) is valid as per Clause 4.1. The maximum payout per approved claim is set at 50,000 as per Clause 5.1."
}
