In [1]:
from langchain_community.document_loaders import PyPDFLoader, UnstructuredPDFLoader
from unstructured.partition.pdf import partition_pdf
import fitz 
import os, json, gzip, sys, traceback
from dotenv import load_dotenv
from pathlib import Path
import base64
import os
from langchain_core.messages import HumanMessage
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnablePassthrough
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
import time
from openai import RateLimitError
import asyncio
from concurrent.futures import ThreadPoolExecutor
from tqdm.auto import tqdm
import nest_asyncio

In [33]:
FOLDER_PATH = r"C:\Users\DELL\Desktop\AI Projects\langchain_proj\Logistic_Agent_Planner\data\road_knowledge"
CACHE_DIR   = r"C:\Users\DELL\Desktop\AI Projects\Langchain_proj\Logistic_Agent_Planner\notebooks\cache_json"
FIG_DIR     = r"C:\Users\DELL\Desktop\AI Projects\Langchain_proj\Logistic_Agent_Planner\notebooks\figures"      
COMPRESS    = True                          
OVERWRITE   = False                          
LOG_EVERY   = 1  
PERSIST_DIR = "./chroma_text" 

In [6]:
try:
    import camelot
    _CAMELOT_AVAILABLE = True
except Exception:
    camelot = None
    _CAMELOT_AVAILABLE = False

In [19]:
def save_jsonl(records, out_path, compress: bool = True):
    out_path = Path(out_path)                   # <— ensure Path
    out_path.parent.mkdir(parents=True, exist_ok=True)
    if compress and not str(out_path).endswith(".gz"):
        out_path = out_path.with_suffix(out_path.suffix + ".gz")
    import gzip, json
    opener = gzip.open if str(out_path).endswith(".gz") else open
    with opener(out_path, "wt", encoding="utf-8") as f:
        for r in records:
            json.dump(r, f, ensure_ascii=False)
            f.write("\n")
    return out_path

def load_jsonl(path: Path):
    opener = gzip.open if str(path).endswith(".gz") else open
    with opener(path, "rt", encoding="utf-8") as f:
        for line in f:
            if line.strip():
                yield json.loads(line)

def is_text_pdf(pdf_path: str, sample_pages: int = 2) -> bool:
    """Cheap probe: if PyPDFLoader returns some text, treat as text-native."""
    try:
        docs = PyPDFLoader(pdf_path).load()
        s = "".join(d.page_content for d in docs[:sample_pages])
        return len(s.strip()) > 50
    except Exception:
        return False

In [8]:
def load_text_fast(pdf_path: str):
    """Use PyPDFLoader for text-native PDFs (fast). Return list[Document]."""
    return PyPDFLoader(pdf_path).load()

def load_text_ocr(pdf_path: str):
    """Light OCR via unstructured (no heavy layout model)."""
    return UnstructuredPDFLoader(
        file_path=pdf_path,
        strategy="fast",            # MUCH faster than 'hi_res'
        mode="elements",
        extract_images_in_pdf=False,
        infer_table_structure=False
    ).load()

def sniff_pages_with_layout(pdf_path: str):
    """Use unstructured 'fast' to find which pages have tables/images."""
    tables, figures = set(), set()
    try:
        elements = partition_pdf(pdf_path, strategy="fast", include_page_breaks=True)
        for el in elements:
            # unstructured elements have .category and .metadata.page_number
            cat = getattr(el, "category", "") or el.__class__.__name__
            pg  = getattr(getattr(el, "metadata", None), "page_number", None)
            if not pg:
                continue
            cat_l = str(cat).lower()
            if "table" in cat_l:
                tables.add(pg)
            if "image" in cat_l or "figure" in cat_l:
                figures.add(pg)
    except Exception:
        # If detection fails, leave sets empty and let downstream skip heavy work
        pass
    return {"tables": tables, "figures": figures}

def extract_tables(pdf_path: str, pages: set[int]):
    """Return list of dicts with page + 2D table data. Gracefully degrades if Camelot unavailable."""
    results = []
    if not pages or not _CAMELOT_AVAILABLE:
        return results
    page_str = ",".join(map(str, sorted(pages)))
    try:
        # Try lattice (lines). If none found, try stream (whitespace).
        tables = camelot.read_pdf(pdf_path, pages=page_str, flavor="lattice")
        if len(tables) == 0:
            tables = camelot.read_pdf(pdf_path, pages=page_str, flavor="stream")
        for t in tables:
            try:
                results.append({
                    "type": "table",
                    "page": int(t.parsing_report.get("page", -1)),
                    "shape": list(t.shape),
                    "data": t.df.values.tolist()
                })
            except Exception:
                # Fallback if parsing_report missing
                results.append({
                    "type": "table",
                    "page": None,
                    "shape": list(getattr(t, "shape", (-1, -1))),
                    "data": t.df.values.tolist()
                })
    except Exception:
        # If Camelot errors (e.g., Ghostscript missing), skip tables
        pass
    return results

def extract_images(pdf_path: str, pages: set[int], out_dir: Path = FIG_DIR):
    """Extract embedded images with PyMuPDF from selected pages."""
    out_dir.mkdir(parents=True, exist_ok=True)
    results = []
    try:
        with fitz.open(pdf_path) as doc:
            for pno in sorted(pages):
                if pno < 1 or pno > doc.page_count: 
                    continue
                page = doc[pno-1]
                for img in page.get_images(full=True):
                    xref = img[0]
                    pix = fitz.Pixmap(doc, xref)
                    name = f"{Path(pdf_path).stem}_p{pno}_{xref}.png"
                    save_path = out_dir / name
                    if pix.n < 5:      # RGB or Gray
                        pix.save(save_path)
                    else:              # CMYK: convert
                        fitz.Pixmap(fitz.csRGB, pix).save(save_path)
                    results.append({"type": "figure", "page": pno, "path": str(save_path)})
                    pix = None
    except Exception:
        pass
    return results


In [10]:
def docs_to_text_records(docs):
    """LangChain Documents -> JSONL records with page & metadata preserved."""
    records = []
    for d in docs:
        md = dict(d.metadata or {})
        page = md.get("page", md.get("page_number", None))
        records.append({
            "type": "text",
            "page": page,
            "content": d.page_content,
            "metadata": md
        })
    return records

In [20]:
def process_pdf(pdf_path, cache_dir=None, overwrite=False):
    pdf_path  = Path(pdf_path)                  # <— ensure Path
    cache_dir = Path(cache_dir) if cache_dir else Path("./cache_json")
    out_path  = cache_dir / (pdf_path.stem + ".jsonl")

    if out_path.with_suffix(out_path.suffix + ".gz").exists() or out_path.exists():
        if not overwrite:
            return str(out_path) if out_path.exists() else str(out_path) + ".gz"

    records = []

    # 1) TEXT
    try:
        if is_text_pdf(str(pdf_path)):
            docs = load_text_fast(str(pdf_path))
        else:
            docs = load_text_ocr(str(pdf_path))
        records.extend(docs_to_text_records(docs))
    except Exception as e:
        print(f"[TEXT ERROR] {pdf_path}: {e}")

    # 2) LAYOUT SNIFF
    pages = sniff_pages_with_layout(str(pdf_path))
    table_pages, figure_pages = pages["tables"], pages["figures"]

    # 3) TABLES & 4) IMAGES
    records.extend(extract_tables(str(pdf_path), table_pages))
    records.extend(extract_images(str(pdf_path), figure_pages, Path("./figures")))

    # 5) SAVE
    saved_path = save_jsonl(records, out_path, compress=True)
    return str(saved_path)

In [21]:
def run_batch(folder):
    folder = Path(folder)                       # <— ensure Path
    folder.mkdir(parents=True, exist_ok=True)
    pdfs = sorted(p for p in folder.glob("*.pdf"))
    print(f"Found {len(pdfs)} PDFs.")
    for i, pdf in enumerate(pdfs, 1):
        target = process_pdf(pdf, cache_dir=Path("./cache_json"), overwrite=False)
        print(f"[{i}/{len(pdfs)}] cached → {target}")
    print("Done.")

In [None]:
run_batch(FOLDER_PATH)

In [3]:
records = []
cache_dir = Path(CACHE_DIR)
print(f"Cache directory exists: {cache_dir.exists()}")
print(f"Files in cache directory: {list(cache_dir.glob('*.jsonl*'))}")  
for p in sorted(cache_dir.glob("*.jsonl.gz")):  
    print(f"Processing file: {p}")
    try:
        with gzip.open(p, "rt", encoding="utf-8") as f:
            for line in f:
                if line.strip():
                    r = json.loads(line)
                    records.append(r)
    except Exception as e:
        print(f"Error processing {p}: {e}")
        print(f"File content preview: {open(p, 'rb').read()[:100]}")
        continue

Cache directory exists: True
Processing file: C:\Users\DELL\Desktop\AI Projects\Langchain_proj\Logistic_Agent_Planner\notebooks\cache_json\250210-IR-Full-Alignment-Map-Web-1-1.jsonl.gz
Processing file: C:\Users\DELL\Desktop\AI Projects\Langchain_proj\Logistic_Agent_Planner\notebooks\cache_json\alternative-heavy-vehicle-rest-area-hume-motorway-minor-works-ref.jsonl.gz
Processing file: C:\Users\DELL\Desktop\AI Projects\Langchain_proj\Logistic_Agent_Planner\notebooks\cache_json\Appendix B - Resilience analysis of key freight roads and railways using CSIRO’s TraNSIT.jsonl.gz
Processing file: C:\Users\DELL\Desktop\AI Projects\Langchain_proj\Logistic_Agent_Planner\notebooks\cache_json\Chapter-2-Project-Rationale.jsonl.gz
Processing file: C:\Users\DELL\Desktop\AI Projects\Langchain_proj\Logistic_Agent_Planner\notebooks\cache_json\Freight-Policy-Reform-Consultation-Paper-April-2024.jsonl.gz
Processing file: C:\Users\DELL\Desktop\AI Projects\Langchain_proj\Logistic_Agent_Planner\notebooks\cache

In [4]:
docs = []
for r in records:
    if r["type"] == "text" and r.get("content"):
        md = r.get("metadata", {})
        md.update({"page": r.get("page"), "type": "text"})
        docs.append(Document(page_content=r["content"], metadata=md))

In [5]:
load_dotenv()

OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')

In [6]:
#benefits of using async
# Efficiency: Process multiple documents simultaneously
# Rate Limiting: Avoid API throttling
# Error Handling: Graceful recovery from failures
# Progress Tracking: See processing status
# Resource Management: Control concurrent API calls

chatgpt = ChatOpenAI(
    model_name='gpt-3.5-turbo-16k',  # Larger context window
    temperature=0,
    max_retries=3,
    request_timeout=60,
    max_tokens=500  # Limit summary length
)



prompt_text = """
You are an assistant tasked with summarizing tables and text particularly for semantic retrieval.
These summaries will be embedded and used to retrieve the raw text or table elements
Give a detailed summary of the table or text below that is well optimized for retrieval.
For any tables also add in a one line description of what the table is about besides the summary.
Do not add redundant words like Summary.
Just output the actual summary content.

Table or text chunk:
{element}
"""
prompt = ChatPromptTemplate.from_template(prompt_text)

# Summary chain
summarize_chain = (
                    {"element": RunnablePassthrough()}
                      |
                    prompt
                      |
                    chatgpt
                      |
                    StrOutputParser() 
)
text_summaries = []
BATCH_SIZE = 20
MAX_CONCURRENCY = 15

text_docs = [doc.page_content for doc in docs]

#Using this to handel openai rate limits
@retry(
    retry=retry_if_exception_type(RateLimitError),
    wait=wait_exponential(multiplier=1, min=2, max=6),
    stop=stop_after_attempt(3)
)
async def process_batch_async(batch):
    return await summarize_chain.abatch(batch, {"max_concurrency": MAX_CONCURRENCY})

async def process_all_batches():
    text_summaries = []
    total_batches = (len(text_docs) + BATCH_SIZE - 1) // BATCH_SIZE
    
    with tqdm(total=len(text_docs), desc="Processing documents") as pbar:
        for i in range(0, len(text_docs), BATCH_SIZE):
            batch = text_docs[i:i + BATCH_SIZE]
            try:
                batch_summaries = await process_batch_async(batch)
                text_summaries.extend(batch_summaries)
                pbar.update(len(batch))
                
                # Only sleep if we hit rate limit
                if i % (BATCH_SIZE * 3) == 0:
                    await asyncio.sleep(1)
                    
            except Exception as e:
                print(f"\nError in batch {i//BATCH_SIZE + 1}: {str(e)}")
                await asyncio.sleep(2)  # Longer sleep on error
                continue
    
    return text_summaries

async def main():
    # Process text documents
    text_summaries = await process_all_batches()
    return text_summaries

# Run the async processing with proper event loop handling

nest_asyncio.apply()  # This allows running async code in Jupyter notebooks

# Now run the main async function
text_summaries = asyncio.run(main())

Processing documents:   0%|          | 0/5597 [00:00<?, ?it/s]

In [32]:
from langchain_openai import OpenAIEmbeddings

openai_embed_model = OpenAIEmbeddings(model='text-embedding-3-small')

In [None]:
from langchain.retrievers import EnsembleRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain.retrievers import ContextualCompressionRetriever
import uuid

# Initialize vector store
vectorstore = Chroma(
    persist_directory=f"{PERSIST_DIR}_vectorstore",
    embedding_function=openai_embed_model,
    collection_name="combined_store",
    collection_metadata={"hnsw:space": "cosine"}
)

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,          # Adjust chunk size based on your needs
    chunk_overlap=200,        # Overlap to maintain context between chunks
    length_function=len,
    separators=["\n\n", "\n", " ", ""]  # Try to split on paragraph breaks first
)


def store_documents_with_summaries(summaries, original_docs, batch_size=500):
    """Store documents and their summaries with linked UUIDs in batches"""
    documents = []
    total_pairs = 0
    
    # Split original documents into chunks first
    for summary, doc in zip(summaries, original_docs):
        pair_id = str(uuid.uuid4())
        
        # Split the original content into chunks
        content_chunks = text_splitter.split_text(doc.page_content)
        
        # Create documents for each chunk
        for i, chunk in enumerate(content_chunks):
            chunk_doc = Document(
                page_content=chunk,
                metadata={
                    "pair_id": pair_id,
                    "chunk_id": i,
                    "type": "content",
                    "page": doc.metadata.get("page", None),
                    "is_summary": False
                }
            )
            documents.append(chunk_doc)
        
        # Keep summary as a single piece
        summary_doc = Document(
            page_content=summary,
            metadata={
                "pair_id": pair_id,
                "type": "summary",
                "page": doc.metadata.get("page", None),
                "is_summary": True
            }
        )
        documents.append(summary_doc)
    
    # Add documents in batches
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i + batch_size]
        vectorstore.add_documents(batch)
        total_pairs += len([d for d in batch if d.metadata["is_summary"]])
        print(f"Added batch of {len(batch)} documents ({total_pairs} pairs)")
    
    print(f"Total: Added {len(documents)} documents ({total_pairs} pairs)")
    return documents

try:
    stored_docs = store_documents_with_summaries(
        summaries=text_summaries,
        original_docs=docs,
        batch_size=500  # Adjust this value if needed
    )
except Exception as e:
    print(f"Error storing documents: {str(e)}")


# Create retriever with compression
retriever = vectorstore.as_retriever(
    search_type="mmr",
    search_kwargs={
        "k": 6,              # Increased to get more chunks
        "fetch_k": 30,       # Fetch more candidates for MMR
        "lambda_mult": 0.7   # Balance between relevance and diversity
    }
)

# Create the final retriever with compression
llm = ChatOpenAI(model='gpt-4o-mini', temperature=0)
compressor = LLMChainExtractor.from_llm(llm)
final_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

# Save the vectorstore
vectorstore.persist()

Added batch of 500 documents (237 pairs)
Added batch of 500 documents (487 pairs)
Added batch of 500 documents (728 pairs)
Added batch of 500 documents (969 pairs)
Added batch of 500 documents (1219 pairs)
Added batch of 500 documents (1469 pairs)
Added batch of 500 documents (1719 pairs)
Added batch of 500 documents (1969 pairs)
Added batch of 500 documents (2219 pairs)
Added batch of 500 documents (2469 pairs)
Added batch of 500 documents (2719 pairs)
Added batch of 500 documents (2969 pairs)
Added batch of 500 documents (3219 pairs)
Added batch of 500 documents (3469 pairs)
Added batch of 500 documents (3719 pairs)
Added batch of 500 documents (3969 pairs)
Added batch of 500 documents (4219 pairs)
Added batch of 500 documents (4469 pairs)
Added batch of 500 documents (4719 pairs)
Added batch of 500 documents (4969 pairs)
Added batch of 500 documents (5219 pairs)
Added batch of 500 documents (5469 pairs)
Added batch of 268 documents (5597 pairs)
Total: Added 11268 documents (5597 pai

In [62]:
gpt = ChatOpenAI(model='gpt-4o-mini', temperature=0.4)
qa_prompt = ChatPromptTemplate.from_template("""
You are a helpful assistant that provides accurate answers based on the given context.
Answer the question using only the provided context. If you cannot find the answer in the context, say "I cannot answer this based on the provided context."
Be concise and clear in your response. repharse the response in more human readable way.

Context: {context}

Question: {question}

Answer: """)

# Define formatting for the retrieved documents
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# Create the RAG chain
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | qa_prompt
    | gpt
)

# Function to get answers
def get_answer(question: str) -> str:
    """Get answer for a specific question using the RAG chain"""
    response = rag_chain.invoke(question)
    return response.content

# Example usage and testing
test_questions = [
    "Give me all information you have about hume highway",
]

print("Testing QA System:")
print("-" * 50)
for question in test_questions:
    print(f"\nQuestion: {question}")
    print(f"Answer: {get_answer(question)}")
    print("-" * 50)

Testing QA System:
--------------------------------------------------

Question: Give me all information you have about hume highway
Answer: The provided context repeatedly mentions the Hume Highway (HW2) and indicates that it is located on page 33.
--------------------------------------------------
