In [None]:
import os
%pwd
os.chdir("../")
%pwd

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import transformers
import uuid
from pinecone import Pinecone
from pinecone_text.sparse import BM25Encoder
from langchain_community.retrievers import PineconeHybridSearchRetriever
import torch
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
from Multi_Modal.chunking import get_chunks
from langchain_core.documents import Document
from dotenv import load_dotenv
import torch
import boto3
import re
import json


In [None]:
load_dotenv()

In [None]:
file_path="pdfs/Documentation-Project.pdf"

In [None]:
chunks=get_chunks(file_path)

In [None]:
def seperate_content_types(chunk):

    content_data ={
        "text": chunk.text,
        "tables":[],
        "images":[],
        "types":['text']
    }

    if hasattr(chunk ,"metadata") and hasattr(chunk.metadata, "orig_elements"):
        for element in chunk.metadata.orig_elements:

            element_type = type(element).__name__

            if element_type =="Table":
                content_data['types'].append('Table')
                table_html = getattr(element.metadata , 'text_as_html', element.text)
                content_data['tables'].append(table_html)

            elif element_type == "Image":
                if hasattr(element,"metadata") and hasattr(element.metadata ,"image_base64"):
                    content_data['types'].append("Image")
                    content_data['images'].append(element.metadata.image_base64)
    
    content_data['types'] = list(set(content_data['types']))
    return content_data


In [None]:
class ApnaChatModel:
    def __init__(self, region="us-east-1"):
        self.client = boto3.client(
            "bedrock-runtime",
            region_name=region
        )
        self.model_id = "amazon.nova-pro-v1:0"

    def invoke(self, messages, max_tokens=500):
        response = self.client.invoke_model(
            modelId=self.model_id,
            contentType="application/json",
            accept="application/json",
            body=json.dumps({
                "messages": messages,
                "inferenceConfig": {
                    "max_new_tokens": max_tokens
                }
            })
        )

        result = json.loads(response["body"].read())
        return result["output"]["message"]["content"][0]["text"]


In [None]:
def create_ai_enhanced_summary(
    text: str,
    tables: list,
    images: list,
    model: ApnaChatModel
) -> str:

    prompt = f"""
You are an AI assistant creating a searchable description for document retrieval.

--- TEXT CONTENT ---
{text}
"""

    if tables:
        prompt += "\n--- TABLES ---\n"
        for i, table in enumerate(tables):
            prompt += f"Table {i+1}:\n{table}\n\n"

    prompt += """
--- YOUR TASK ---
Generate a comprehensive, searchable description.

Cover:
1. Key facts, exact numbers, and metrics
2. Main topics and concepts
3. Questions this content can answer
4. Visual insights from images (charts, diagrams, patterns)
5. Alternative keywords and synonyms

Prioritize searchability over brevity.
"""

    content_blocks = []

    for img_b64 in images:
        content_blocks.append({
            "image": {
                "format": "jpeg",
                "source": {
                    "bytes": img_b64.strip()
                }
            }
        })

    content_blocks.append({
        "text": prompt
    })

    messages = [
        {
            "role": "user",
            "content": content_blocks
        }
    ]

    return model.invoke(messages)

In [None]:
def summarize_chunks(chunks):

    print("...Processing Chunks...")
    model = ApnaChatModel()
    langchain_document = []

    total_chunk = len(chunks)

    for i, chunk in enumerate(chunks):
        current_chunk = i+1
        print(f"Processed Chunk {current_chunk}/{total_chunk}")

        content_data = seperate_content_types(chunk)

        print(f"Types Found: {content_data['types']}")
        print(f"Tables: {len(content_data['tables'])}, Images: {len(content_data['images'])}")

        enhanced_cnt = ""

        if content_data['tables'] or content_data['images']:
            print("Creating AI Summary...")
            try:
                enhanced_cnt = create_ai_enhanced_summary(
                    text=content_data['text'],
                    tables=content_data['tables'],
                    images=content_data['images'],
                    model=model
                )
                print("Summary created")
            except Exception as e:
                print(f"Summary failed: {e}")
                enhanced_cnt = content_data['text']
        else:
            print("No tables or Image Found")
            enhanced_cnt = content_data['text']

        doc = Document(
            page_content=enhanced_cnt,
            metadata={
                "original_content": json.dumps({
                    "raw_text": content_data['text'],
                    "table_html": content_data['tables'],
                    "image_base64": content_data['images']
                })
            }
        )

        langchain_document.append(doc)

    print(f"Processed {len(langchain_document)} chunks")
    return langchain_document

In [None]:
sum_chunks=summarize_chunks(chunks)

In [None]:
print(sum_chunks[10].page_content)

In [None]:
class TitanEmbeddings(object):
    accept = "application/json"
    content_type = "application/json"
    
    def __init__(self, model_id="amazon.titan-embed-text-v2:0"):
        self.bedrock = boto3.client(service_name='bedrock-runtime')
        self.model_id = model_id
        self.dimensions=1024
        self.normalize=True
    def invoke(self, text):
        body = json.dumps({
            "inputText": text,
            "dimensions": self.dimensions,
            "normalize": self.normalize
        })
        response = self.bedrock.invoke_model(
            body=body, modelId=self.model_id, accept=self.accept, contentType=self.content_type
        )
        response_body = json.loads(response.get('body').read())
        return response_body['embedding']

In [None]:
dimensions = 1024
normalize = True
    
titan_embeddings_v2 = TitanEmbeddings()

input_text = sum_chunks[10].page_content
embedding = titan_embeddings_v2.invoke(input_text)
    
print(f"{input_text=}")
print(f"{embedding[:10]=}")


In [None]:
text=[]
emb=[]

In [None]:
for cnt in sum_chunks:
    t=re.sub(r"#{2,3}", "", sum_chunks[0].page_content)
    text.append(t)

In [None]:
print(text[0][1:100])

In [None]:
for txt in text:
    emb.append(titan_embeddings_v2.invoke(txt))

In [None]:
bm25=BM25Encoder.default()

In [None]:
bm25.fit(text)

In [None]:
id=uuid.uuid4().hex[1:15]
user_id="user-" + str(id)
user_id

In [None]:
from langchain_pinecone import PineconeVectorStore, PineconeSparseVectorStore
from pinecone import ServerlessSpec
from pinecone_text.sparse import BM25Encoder

In [None]:
pc=Pinecone()
existing_names=[name['name'] for name in pc.list_indexes()]

In [None]:
if user_id not in existing_names:
    pc.create_index(
        name=user_id,
        dimension=1024,
        metric="dotproduct",
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )



In [None]:
obj=boto3.client("s3")

In [None]:
data={
    "name":"Soumya",
    "text":"tthanks"
}

In [None]:
obj.put_object(
    Bucket="soumya-rag",
    Key=f"{user_id}/{uuid.uuid4().hex[1:15]}.json",
    Body=json.dumps(data),
    ContentType="application/json"
)

In [None]:
BUCKET_NAME='soumya-rag'

In [None]:
index=pc.Index(user_id)

In [None]:
vectors_to_upsert = []
batch_size = 2

In [None]:
for i,txt in enumerate(text):
    doc_id=uuid.uuid4().hex[1:12]
    vector={
        "id":id,
        "values":emb[i],
        "sparse_values":bm25.encode_queries(txt),
        "metadata":{
            "text":txt,
            "s3_uri":f"s2//{BUCKET_NAME}/{user_id}/{doc_id}/.json"
        }
    }
    vectors_to_upsert.append(vector)
    if len(vectors_to_upsert) > batch_size:
        index.upsert(vectors_to_upsert)
        vectors_to_upsert = []
    

In [None]:
from typing import List,Tuple
from langchain_classic.schema import Document

In [None]:
from langchain.embeddings.base import Embeddings

class TitanLangChainEmbeddings(Embeddings):
    def __init__(self, titan_model):
        self.titan_model = titan_model

    def embed_documents(self, texts):
        return [self.titan_model.invoke(text) for text in texts]

    def embed_query(self, text):
        return self.titan_model.invoke(text)


In [None]:
class CustomHybridRetriever:
    def __init__(self, user_id, emb_model, bm25, k=5, alpha=0.7):
        self.user_id = user_id
        self.emb_model = emb_model
        self.bm25 = bm25
        self.k = k
        self.alpha = alpha
        pc1=Pinecone()
        self.index_obj=pc1.Index(self.user_id)

    def get_relevant_documents(self, query):
        dense = self.emb_model.invoke(query)
        sparse = self.bm25.encode_queries(query)

        dense = [v * self.alpha for v in dense]
        sparse["values"] = [v * (1 - self.alpha) for v in sparse["values"]]

        res = self.index_obj.query(
            vector=dense,
            sparse_vector=sparse,
            top_k=self.k,
            include_metadata=True
        )

        seen_ids = set()
        docs = []

        for m in res["matches"]:
            doc_id = m["id"]  

            if doc_id in seen_ids:
                continue

            seen_ids.add(doc_id)

            docs.append(
                Document(
                    page_content=m["metadata"]["text"],
                    metadata={
                        **m["metadata"],
                        "id": doc_id
                    }
                )
            )

        return docs



In [None]:
def upload_file(document:List[Document],user_id,bucket_name):

    try:
        obj= boto3.client("s3")
        print("logined s3")

    except Exception as e :
        print("logined failed")
        raise e
    
    valid_texts = []
    valid_docs_original = []

    for doc in document:
        try:
            content = ""
            if hasattr(doc, "page_content") and doc.page_content:
                content = doc.page_content
            elif hasattr(doc, "metadata") and "original_content" in doc.metadata:
                content = doc.metadata['original_content']
            
            if content:
                valid_texts.append(content)
                valid_docs_original.append(doc)
            else:
                print(f"Skipping document with no content: {doc.metadata.get('source', 'unknown')}")
                
        except Exception as e:
            print(f"Error extracting text from doc: {e}")

    print(f"Extracted {len(valid_texts)} valid texts")

    if not valid_texts:
        raise ValueError("No valid texts extracted from documents.")
    
    embbed_model=TitanEmbeddings()
    try:
        sample_vec = embbed_model.invoke(valid_texts[0])
        dim = len(sample_vec)
        print("Created Dense Vectors, Dimension of each vector is: ", dim)
    except Exception as e:
        raise ValueError(f"Failed to generate valid embedding: {e}")

    
    bm25 = BM25Encoder.default()
    bm25.fit(valid_texts)

    print("Initializing Pinecone...")
    pc = Pinecone(api_key=os.environ.get('PINECONE_API_KEY'))
    existing_indexes = [i["name"] for i in pc.list_indexes()]
    print(user_id)
    if user_id not in existing_indexes:
        print(f"Creating new Pinecone index: {user_id}")
        pc.create_index(
            name=user_id,
            dimension=dim,
            metric="dotproduct",
            spec=ServerlessSpec(cloud="aws", region="us-east-1")
        )
    else:
        print(f"Using existing Pinecone index: {user_id}")

    print("Created/Logined to Pinecone Successfully")
    index=pc.Index(user_id)
    vectors_to_upsert=[]
    for i, doc in enumerate(valid_docs_original):
        doc_id = uuid.uuid4().hex[:15]
        try:

            metadata_dict = doc.metadata if isinstance(doc.metadata, dict) else {}
            orig_cnt = json.loads(metadata_dict['original_content'])
            
            bucket_content = {
                "id": doc_id,
                "raw_text": orig_cnt.get('raw_text', ""),
                "summ_text": valid_texts[i],
                'table_as_html':orig_cnt.get('table_as_html', {}),
                'image_base64':orig_cnt.get('image_base64', {})
            }

            obj.put_object(
            Bucket=bucket_name,
            Key=f"{user_id}/{doc_id}.json",
            Body=json.dumps(bucket_content),
            ContentType="application/json"
        )


            dense_vector = embbed_model.invoke(valid_texts[i])
            sparse_vector = bm25.encode_documents(valid_texts[i])
            
            vector = {
                "id": doc_id,
                "values": dense_vector,
                "sparse_values": sparse_vector,
                "metadata": {
                    "text": valid_texts[i], 
                    "s3_uri": f"s3//{bucket_name}/{user_id}/{doc_id}.json"
    
                }
            }
            vectors_to_upsert.append(vector)

    
            if len(vectors_to_upsert) >= batch_size:
                index.upsert(vectors=vectors_to_upsert)
                vectors_to_upsert = []
                print(f"Upserted batch ending at {i}")

        except Exception as e:
            print(f"Error processing doc {i}: {e}")

            continue

    retriever =CustomHybridRetriever(
        user_id=user_id,
        emb_model=embbed_model,
        bm25=bm25
    )
    
    return retriever

In [None]:
user_id="user-"+str(uuid.uuid4().hex[1:12])
user_id

In [None]:
ret= upload_file(sum_chunks,user_id,BUCKET_NAME)

In [None]:
# "s3://soumya-rag/configs/084cf1909a6456.json"

In [None]:
query="what is the project about"

In [None]:
docs = ret.get_relevant_documents(
    query
)

In [None]:
docs

In [None]:
s3 = boto3.client("s3")

obj = s3.get_object(
    Bucket="soumya-rag",
    Key="user-d959f441f15/a9f5ff528ee944b.json"
)

data = json.loads(obj["Body"].read().decode("utf-8"))

print(data)


In [None]:
data

In [None]:
_,uri=docs[0].metadata['s3_uri'].split("//")
_,user_id,doc_id=uri.split('/')
user_id,doc_id

In [None]:
def retreive_document(docs) ->List[Document]:

    retreived_documents=[]
    try:

        s3=boto3.client("s3")
        print("aws logined")


        for dox in docs:
            if hasattr(dox,"metadata") and "s3_uri" in dox.metadata:
                _,uri = dox.metadata['s3_uri'].split("//")
                _,user_id,doc_id=uri.split('/')

            
                obj=s3.get_object(
                    Bucket=BUCKET_NAME,
                    Key=f"{user_id}/{doc_id}"
                )

            data = json.loads(obj["Body"].read().decode("utf-8"))
            d=Document(
                page_content=data['raw_text'],
                metadata={
                    "table_as_html":data['table_as_html'],
                    "image_base64":data['image_base64']
                }
            )

            retreived_documents.append(d)

        
        return retreived_documents
    
    except Exception as e:
        print("failed due to ",e)
        raise 

In [None]:
ret_docs=retreive_document(docs)

In [None]:
ret_docs

In [None]:
import base64

def gen_ans(docs, query):
    try:
        llm = ApnaChatModel()

        prompt_text = f"""
Based on the following document context, answer the question.

QUESTION:
{query}

CONTENT:
"""

        all_images = []

        for i, chunk in enumerate(docs):
            prompt_text += f"\n--- Document Fragment {i+1} ---\n"

            if not hasattr(chunk, "page_content"):
                continue

            prompt_text += f"TEXT:\n{chunk.page_content}\n"

            metadata = getattr(chunk, "metadata", {})

            tables = metadata.get("table_as_html", {})
            if isinstance(tables, dict):
                tables = tables.values()

            if tables:
                prompt_text += "\nTABLES:\n"
                for j, table in enumerate(tables):
                    prompt_text += f"Table {j+1}:\n{table}\n\n"

            imgs = metadata.get("image_base64", [])
            if isinstance(imgs, str):
                imgs = [imgs]

            for img in imgs:
                decoded = base64.b64decode(img)
                all_images.append(decoded)

        prompt_text += """
INSTRUCTIONS:
- Use ONLY the provided content
- If insufficient info, say: "I don't have enough information to answer the question."
- Be concise and factual

ANSWER:
"""

        content_block = [{
            "text": prompt_text
        }]

        for img_bytes in all_images:
            content_block.append({
                "image": {
                    "format": "jpeg",
                    "source": {
                        "bytes": img_bytes
                    }
                }
            })

        message = [{
            "role": "user",
            "content": content_block
        }]

        return llm.invoke(message)

    except Exception as e:
        print(f"Answer gen failed: {e}")
        raise RuntimeError("Sorry, I encountered an error generating the answer.")


In [None]:
gen_ans(docs,query)

In [None]:
def create_ai_enhanced_summary(
    text: str,
    tables: list,
    images: list,
    model: ApnaChatModel
) -> str:

    prompt = f"""
You are an AI assistant creating a searchable description for document retrieval.

--- TEXT CONTENT ---
{text}
"""

    if tables:
        prompt += "\n--- TABLES ---\n"
        for i, table in enumerate(tables):
            prompt += f"Table {i+1}:\n{table}\n\n"

    prompt += """
--- YOUR TASK ---
Generate a comprehensive, searchable description.

Cover:
1. Key facts, exact numbers, and metrics
2. Main topics and concepts
3. Questions this content can answer
4. Visual insights from images (charts, diagrams, patterns)
5. Alternative keywords and synonyms

Prioritize searchability over brevity.
"""

    content_blocks = []

    for img_b64 in images:
        content_blocks.append({
            "image": {
                "format": "jpeg",
                "source": {
                    "bytes": img_b64.strip()
                }
            }
        })

    content_blocks.append({
        "text": prompt
    })

    messages = [
        {
            "role": "user",
            "content": content_blocks
        }
    ]

    return model.invoke(messages)

In [None]:
def gen_final_ans(chunks, query):
    try:
        llm = ApnaChatModel()

        prompt_text = f"""
        Based on the following document context, please answer this question: {query}
        
        CONTENT_TO_ANALYZE:
        """
        all_images_base64 = []
        for i, chunk in enumerate(chunks):
            prompt_text += f"\n--- Document Fragment {i+1} ---\n"
            
            if hasattr(chunk, "metadata") and "original_content" in chunk.metadata:
                try:
                    original_data = json.loads(chunk.metadata['original_content'])
                    
                    raw_text = original_data.get("raw_text", "")
                    if raw_text:
                        prompt_text += f"Text:\n{raw_text}\n\n"

                    tables_html = original_data.get("tables_html", [])
                    if tables_html:
                        prompt_text += 'TABLES:\n'
                        for j, table in enumerate(tables_html):
                            prompt_text += f"Table {j+1}:\n{table}\n\n"
                    
                    
                    imgs = original_data.get("images_base64", [])
                    if imgs:
                        all_images_base64.extend(imgs)

                except json.JSONDecodeError:
                    prompt_text += f"Text:\n{chunk.page_content}\n\n"
            else:
                prompt_text += f"Text:\n{chunk.page_content}\n\n"

        prompt_text += """ 
        INSTRUCTIONS:
        Provide a clear, comprehensive answer using the text, tables, and images provided above. 
        If the documents don't contain sufficient information to answer the question, state: "I don't have enough information to answer the question."
        
        ANSWER:"""
    
        message_content = [{'type': 'text', 'text': prompt_text}]

        for img_b64 in all_images_base64:
            message_content.append({"type": "image_url","image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}})

        message = HumanMessage(content=message_content)
        response = llm.invoke([message])

        return response.content
        
    except Exception as e:
        print(f"Answer gen failed: {e}")
        return 'Sorry, I encountered an error generating the answer.'