In [2]:
pip install dspy

Note: you may need to restart the kernel to use updated packages.


In [9]:
import dspy
from typing import Dict, List

In [10]:

class ContentAnalyzer(dspy.Signature):
    """
    Specialized analyzer for regulatory and compliance documents.

    The analyzer is designed to process documents page by page. Each invocation handles a dictionary of pages,

    What type of information does this contain that can help identify the most relevant page for a given query.
    These tags help identify the most relevant pages for a given query.
    """

    # Input Field
    content: Dict[int, str] = dspy.InputField(
        desc="A dictionary where key is the page number and value is the regulatory/compliance document content in markdown format."
    )

    # Output Field
    content_tags: Dict[int, Dict[str, List[str]]] = dspy.OutputField(
        desc="""
            Semantic and structural tags extracted from each page, organized into high-level categories
            to support intelligent search and navigation.
            """
    )

class TagAnalysisPipeline(dspy.Module):
    def __init__(self):
        super().__init__()
        self.analyzer = dspy.Predict(ContentAnalyzer)

    def analyze(self, content: Dict[int,str]) -> Dict:
        """
        Analyze content and extract organized tags.
        """
        results = self.analyzer(content=content)
        return results.content_tags

def extract_document_tags(content: Dict[int,str]) -> Dict:
    """
    Extract and organize tags from document content.

    Args:
        content (str): Document content to analyze

    Returns:
        Dict: Organized tags by category
    """
    pipeline = TagAnalysisPipeline()
    tags = pipeline.analyze(content)
    return tags


## Search Tags (Implement AI Agents here)
class SearchTag(dspy.Signature):
    """
    You are an intelligent document analyzer. The system processes documents page by page, where each page has associated metadata in the form of tags. Each invocation receives a dictionary where keys are page numbers and values are lists of tags for that page. Your task is to identify the most relevant page(s) for a given user query based on these tags.
    Use semantic understanding of the query to match it with the most relevant tags.

    Be concise and confident in your answers.
    """

    # Input Field
    query: str = dspy.InputField(
        desc="The query for which tags need to find out"
    )

    tags: Dict[int, Dict[str,str]] = dspy.InputField(
        desc="""A list of tags associated with each page,
            where keys are page numbers and dict are lists of tags."""
    )

    # Output Field
    relevant_tags: Dict[int,str] = dspy.OutputField(
        desc="""
            Relavant tags for a query, think of all possoblities

            return the page as a key, and matched part as value
            """
    )


# %%
predictor = dspy.Predict(SearchTag)
query = "Explain the different categories of Stop lamp with respect to vehicle?"


## Query Answer (INluce Section number and relevant section and documents)

class QueryAnswer(dspy.Signature):
    """
    Given a query and a document context, perform a multi-step analysis:
    1. Extract relevant content.
    2. Identify numerical values.
    3. Summarize the extracted information.
    4. Include section references.
    5. Handle missing information gracefully.
    """

    # Input Fields
    query: str = dspy.InputField(
        desc="The user query for which information needs to be retrieved and analyzed from the context."
    )

    context: str = dspy.InputField(
        desc="The source document or text from which to extract information related to the query."
    )

    # Output Field
    output_answer: str = dspy.OutputField(
        desc="""
        A comprehensive answer to the query that includes:
        - A list of relevant sections at the top
        - A detailed response using bullet points, subheadings, and structured explanation
        - All important aspects of the query answered in a logical and readable manner
        """
    )

rag = dspy.ChainOfThought(QueryAnswer)

In [11]:
pip install pdfplumber

Note: you may need to restart the kernel to use updated packages.


In [17]:
pip install langchain langchain-community pdfplumber numpy scikit-learn faiss-cpu requests langchain-groq googlesearch-python beautifulsoup4 langchain-experimental sentence_transformers

Note: you may need to restart the kernel to use updated packages.


In [None]:
import pdfplumber
import random
import warnings
warnings.filterwarnings("ignore")
import random
import pdfplumber
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from langchain.embeddings import HuggingFaceBgeEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from langchain.docstore.document import Document
from langchain_community.vectorstores import FAISS
def get_header_footer(pdf_path, threshold=0.71):
        with pdfplumber.open(pdf_path) as pdf:
            total_pages = len(pdf.pages)
            if total_pages >= 15:
                random_page_nos = random.sample(range(5, total_pages), 10)
            else:
                random_page_nos = list(range(total_pages))

            avg_similarity = 1
            header_lines = -1
            while avg_similarity > threshold and header_lines < 4:
                header_lines += 1
                five_lines = []
                for page_no in random_page_nos:
                    lines = pdf.pages[page_no].extract_text().split('\n')
                    if len(lines) > header_lines:
                        five_lines.append(lines[header_lines])
                similarities = cosine_similarity(embed_texts(five_lines))
                avg_similarity = np.mean(similarities[np.triu_indices(len(similarities), k=1)])

            avg_similarity = 1
            footer_lines = -1
            while avg_similarity > threshold and footer_lines < 4:
                footer_lines += 1
                five_lines = []
                for page_no in random_page_nos:
                    lines = pdf.pages[page_no].extract_text().split('\n')
                    if len(lines) > footer_lines:
                        five_lines.append(lines[-(footer_lines + 1)])
                similarities = cosine_similarity(embed_texts(five_lines))
                avg_similarity = np.mean(similarities[np.triu_indices(len(similarities), k=1)])
            return header_lines, footer_lines


def extract_text(pdf_path):
    header_lines, footer_lines = get_header_footer(pdf_path)
    with pdfplumber.open(pdf_path) as pdf:
        text = ''
        for page in pdf.pages:
            page_text = page.extract_text()
            if page_text:
                lines = page_text.split('\n')
                if lines:
                    page_text = '\n'.join(lines[header_lines:-(footer_lines + 1)])
                    text += page_text + '\n'
        return text

class ContentAnalyzer(dspy.Signature):
    """
    Specialized analyzer for regulatory and compliance documents.

    The analyzer is designed to process documents page by page. Each invocation handles a dictionary of pages,

    What type of information does this contain that can help identify the most relevant page for a given query.
    These tags help identify the most relevant pages for a given query.
    """

    # Input Field
    content: Dict[int, str] = dspy.InputField(
        desc="A dictionary where key is the page number and value is the regulatory/compliance document content in markdown format."
    )

    # Output Field
    content_tags: Dict[int, Dict[str, List[str]]] = dspy.OutputField(
        desc="""
            Semantic and structural tags extracted from each page, organized into high-level categories
            to support intelligent search and navigation.
            """
    )

class TagAnalysisPipeline(dspy.Module):
    def __init__(self):
        super().__init__()
        self.analyzer = dspy.Predict(ContentAnalyzer)

    def analyze(self, content: Dict[int,str]) -> Dict:
        """
        Analyze content and extract organized tags.
        """
        results = self.analyzer(content=content)
        return results.content_tags

def extract_document_tags(content: Dict[int,str]) -> Dict:
    """
    Extract and organize tags from document content.

    Args:
        content (str): Document content to analyze

    Returns:
        Dict: Organized tags by category
    """
    pipeline = TagAnalysisPipeline()
    tags = pipeline.analyze(content)
    return tags


## Search Tags (Implement AI Agents here)
class SearchTag(dspy.Signature):
    """
    You are an intelligent document analyzer. The system processes documents page by page, where each page has associated metadata in the form of tags. Each invocation receives a dictionary where keys are page numbers and values are lists of tags for that page. Your task is to identify the most relevant page(s) for a given user query based on these tags.
    Use semantic understanding of the query to match it with the most relevant tags.

    Be concise and confident in your answers.
    """

    # Input Field
    query: str = dspy.InputField(
        desc="The query for which tags need to find out"
    )

    tags: Dict[int, Dict[str,str]] = dspy.InputField(
        desc="""A list of tags associated with each page,
            where keys are page numbers and dict are lists of tags."""
    )

    # Output Field
    relevant_tags: Dict[int,str] = dspy.OutputField(
        desc="""
            Relavant tags for a query, think of all possoblities

            return the page as a key, and matched part as value
            """
    )


# %%
predictor = dspy.Predict(SearchTag)
query = "Explain the different categories of Stop lamp with respect to vehicle?"


## Query Answer (INluce Section number and relevant section and documents)

class QueryAnswer(dspy.Signature):
    """
    Given a query and a document context, perform a multi-step analysis:
    1. Extract relevant content.
    2. Identify numerical values.
    3. Summarize the extracted information.
    4. Include section references.
    5. Handle missing information gracefully.
    """

    # Input Fields
    query: str = dspy.InputField(
        desc="The user query for which information needs to be retrieved and analyzed from the context."
    )

    context: str = dspy.InputField(
        desc="The source document or text from which to extract information related to the query."
    )

    # Output Field
    output_answer: str = dspy.OutputField(
        desc="""
        A comprehensive answer to the query that includes:
        - A list of relevant sections at the top
        - A detailed response using bullet points, subheadings, and structured explanation
        - All important aspects of the query answered in a logical and readable manner
        """
    )

rag = dspy.ChainOfThought(QueryAnswer)

In [20]:
groq_api_key = "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"

In [27]:
import os
import pdfplumber
import numpy as np
import random
import warnings
warnings.filterwarnings("ignore")
from typing import Dict, List
from sklearn.metrics.pairwise import cosine_similarity
import langchain
import dspy
from dspy import Predict, ChainOfThought, Signature, InputField, OutputField

from langchain.embeddings import HuggingFaceBgeEmbeddings
from langchain_groq import ChatGroq
from dspy.teleprompt import LangChain
from langchain_groq import ChatGroq

# ========== LLM SETUP ==========
# ✅ Using LLaMA3-70B from Groq
from langchain_groq import ChatGroq
import dspy

# Initialize your LangChain LLM (Groq's LLaMA3)
lc_llm = ChatGroq(temperature=0.2, model_name="llama3-70b-8192")

# Wrap it in a DSPy-compatible LLM module
class LangChainLLM(dspy.LM):
    def __init__(self, lc_llm):
        super().__init__()
        self.lc_llm = lc_llm

    def __call__(self, prompt, **kwargs):
        return self.lc_llm.invoke(prompt).content

# Register it as DSPy's default LLM
dspy.settings.configure(lm=LangChainLLM(lc_llm))

# ========== Embedding Model ==========
embed_model = HuggingFaceBgeEmbeddings(model_name="BAAI/bge-base-en-v1.5")

def embed_texts(texts):
    return embed_model.embed_documents(texts)

# ========== PDF CLEANING ==========
def get_header_footer(pdf_path, threshold=0.71):
    with pdfplumber.open(pdf_path) as pdf:
        total_pages = len(pdf.pages)
        random_page_nos = random.sample(range(5, total_pages), 10) if total_pages >= 15 else list(range(total_pages))

        avg_similarity = 1
        header_lines = -1
        while avg_similarity > threshold and header_lines < 4:
            header_lines += 1
            lines_to_compare = [
                pdf.pages[i].extract_text().split('\n')[header_lines]
                for i in random_page_nos if len(pdf.pages[i].extract_text().split('\n')) > header_lines
            ]
            similarities = cosine_similarity(embed_texts(lines_to_compare))
            avg_similarity = np.mean(similarities[np.triu_indices(len(similarities), k=1)])

        avg_similarity = 1
        footer_lines = -1
        while avg_similarity > threshold and footer_lines < 4:
            footer_lines += 1
            lines_to_compare = [
                pdf.pages[i].extract_text().split('\n')[-(footer_lines + 1)]
                for i in random_page_nos if len(pdf.pages[i].extract_text().split('\n')) > footer_lines
            ]
            similarities = cosine_similarity(embed_texts(lines_to_compare))
            avg_similarity = np.mean(similarities[np.triu_indices(len(similarities), k=1)])

        return header_lines, footer_lines

def extract_text_by_page(pdf_path):
    header_lines, footer_lines = get_header_footer(pdf_path)
    content_by_page = {}
    with pdfplumber.open(pdf_path) as pdf:
        for i, page in enumerate(pdf.pages):
            page_text = page.extract_text()
            if page_text:
                lines = page_text.split('\n')
                clean_text = '\n'.join(lines[header_lines:-(footer_lines + 1)]) if footer_lines >= 0 else '\n'.join(lines[header_lines:])
                content_by_page[i] = clean_text
    return content_by_page

# ========== DSPy Signatures ==========
class ContentAnalyzer(Signature):
    content: Dict[int, str] = InputField(desc="Page-wise cleaned text of a regulatory document")
    content_tags: Dict[int, Dict[str, List[str]]] = OutputField(desc="Tags for each page categorized")

class TagAnalysisPipeline(dspy.Module):
    def __init__(self):
        super().__init__()
        self.analyzer = Predict(ContentAnalyzer)

    def analyze(self, content: Dict[int, str]) -> Dict:
        return self.analyzer(content=content).content_tags

class SearchTag(Signature):
    query: str = InputField(desc="User query")
    tags: Dict[int, Dict[str, str]] = InputField(desc="Tagged metadata for each page")
    relevant_tags: Dict[int, str] = OutputField(desc="Page numbers and matching tags")

class QueryAnswer(Signature):
    query: str = InputField(desc="User's natural language question")
    context: str = InputField(desc="Content pulled from the document")
    output_answer: str = OutputField(desc="Detailed and structured answer with references")

# ========== MAIN ==========
pdf_dir = "./pdfs"
documents = {}

for filename in os.listdir(pdf_dir):
    if filename.endswith(".pdf"):
        path = os.path.join(pdf_dir, filename)
        print(f"🔍 Processing: {filename}")
        content_by_page = extract_text_by_page(path)
        tagger = TagAnalysisPipeline()
        tags = tagger.analyze(content_by_page)
        documents[filename] = {
            "pages": content_by_page,
            "tags": tags
        }

# ========== USER QUERY ==========
query = "Explain the different categories of Stop lamp with respect to vehicle?"

# Step 1: Tag search
predictor = Predict(SearchTag)
doc_name = list(documents.keys())[0]
doc = documents[doc_name]

search_result = predictor(query=query, tags=doc["tags"])
print("\n🔍 Relevant Tags Found:")
print(search_result.relevant_tags)

# Step 2: Answer with DSPy CoT
top_page = list(search_result.relevant_tags.keys())[0]
context = doc["pages"][top_page]

rag = ChainOfThought(QueryAnswer)
final = rag(query=query, context=context)

print("\n🧠 Final Answer:")
print(final.output_answer)


ImportError: cannot import name 'LangChain' from 'dspy.teleprompt' (c:\Users\Admin\AppData\Local\Programs\Python\Python312\Lib\site-packages\dspy\teleprompt\__init__.py)

In [28]:
from langchain_groq import ChatGroq
import dspy
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# Better integration with streaming support
lc_llm = ChatGroq(
    temperature=0.2, 
    model_name="llama3-70b-8192",
    streaming=True,
    callback_manager=CallbackManager([StreamingStdOutCallbackHandler()])
)

# Use DSPy's built-in LangChain integration
llm = dspy.LangChain(lc_llm)
dspy.settings.configure(lm=llm)

AttributeError: module 'dspy' has no attribute 'LangChain'

In [None]:

class ContentAnalyzer(dspy.Signature):
    """
    Specialized analyzer for regulatory and compliance documents.

    The analyzer is designed to process documents page by page. Each invocation handles a dictionary of pages,

    What type of information does this contain that can help identify the most relevant page for a given query.
    These tags help identify the most relevant pages for a given query.
    """

    # Input Field
    content: Dict[int, str] = dspy.InputField(
        desc="A dictionary where key is the page number and value is the regulatory/compliance document content in markdown format."
    )

    # Output Field
    content_tags: Dict[int, Dict[str, List[str]]] = dspy.OutputField(
        desc="""
            Semantic and structural tags extracted from each page, organized into high-level categories
            to support intelligent search and navigation.
            """
    )

class TagAnalysisPipeline(dspy.Module):
    def __init__(self):
        super().__init__()
        self.analyzer = dspy.Predict(ContentAnalyzer)

    def analyze(self, content: Dict[int,str]) -> Dict:
        """
        Analyze content and extract organized tags.
        """
        results = self.analyzer(content=content)
        return results.content_tags

def extract_document_tags(content: Dict[int,str]) -> Dict:
    """
    Extract and organize tags from document content.

    Args:
        content (str): Document content to analyze

    Returns:
        Dict: Organized tags by category
    """
    pipeline = TagAnalysisPipeline()
    tags = pipeline.analyze(content)
    return tags


## Search Tags (Implement AI Agents here)
class SearchTag(dspy.Signature):
    """
    You are an intelligent document analyzer. The system processes documents page by page, where each page has associated metadata in the form of tags. Each invocation receives a dictionary where keys are page numbers and values are lists of tags for that page. Your task is to identify the most relevant page(s) for a given user query based on these tags.
    Use semantic understanding of the query to match it with the most relevant tags.

    Be concise and confident in your answers.
    """

    # Input Field
    query: str = dspy.InputField(
        desc="The query for which tags need to find out"
    )

    tags: Dict[int, Dict[str,str]] = dspy.InputField(
        desc="""A list of tags associated with each page,
            where keys are page numbers and dict are lists of tags."""
    )

    # Output Field
    relevant_tags: Dict[int,str] = dspy.OutputField(
        desc="""
            Relavant tags for a query, think of all possoblities

            return the page as a key, and matched part as value
            """
    )


# %%
predictor = dspy.Predict(SearchTag)
query = "Explain the different categories of Stop lamp with respect to vehicle?"


## Query Answer (INluce Section number and relevant section and documents)

class QueryAnswer(dspy.Signature):
    """
    Given a query and a document context, perform a multi-step analysis:
    1. Extract relevant content.
    2. Identify numerical values.
    3. Summarize the extracted information.
    4. Include section references.
    5. Handle missing information gracefully.
    """

    # Input Fields
    query: str = dspy.InputField(
        desc="The user query for which information needs to be retrieved and analyzed from the context."
    )

    context: str = dspy.InputField(
        desc="The source document or text from which to extract information related to the query."
    )

    # Output Field
    output_answer: str = dspy.OutputField(
        desc="""
        A comprehensive answer to the query that includes:
        - A list of relevant sections at the top
        - A detailed response using bullet points, subheadings, and structured explanation
        - All important aspects of the query answered in a logical and readable manner
        """
    )

rag = dspy.ChainOfThought(QueryAnswer)

In [29]:
pip install git+https://github.com/stanfordnlp/dspy.git

Collecting git+https://github.com/stanfordnlp/dspy.git
  Cloning https://github.com/stanfordnlp/dspy.git to c:\users\admin\appdata\local\temp\pip-req-build-o32yksw9
  Resolved https://github.com/stanfordnlp/dspy.git to commit 5cd355b6fb4625b912fe7072f31ad5b01f73a988
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Note: you may need to restart the kernel to use updated packages.


  Running command git clone --filter=blob:none --quiet https://github.com/stanfordnlp/dspy.git 'C:\Users\Admin\AppData\Local\Temp\pip-req-build-o32yksw9'


In [30]:
import os
import pdfplumber
import numpy as np
import random
import warnings
warnings.filterwarnings("ignore")
from typing import Dict, List
from sklearn.metrics.pairwise import cosine_similarity
import dspy

from langchain.embeddings import HuggingFaceBgeEmbeddings
from langchain_groq import ChatGroq
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# ========== LLM SETUP ==========
# Set up environment variable for API key
# os.environ["GROQ_API_KEY"] = "your-api-key-here"  # Uncomment and set your API key if needed

# Initialize your LangChain LLM (Groq's LLaMA3)
lc_llm = ChatGroq(
    temperature=0.2, 
    model_name="llama3-70b-8192",
    streaming=True,
    callback_manager=CallbackManager([StreamingStdOutCallbackHandler()])
)

# Custom DSPy-compatible LLM class
class LangChainLLM(dspy.LM):
    def __init__(self, lc_llm):
        super().__init__()
        self.lc_llm = lc_llm

    def __call__(self, prompt, **kwargs):
        return self.lc_llm.invoke(prompt).content

# Register it as DSPy's default LLM
llm = LangChainLLM(lc_llm)
dspy.settings.configure(lm=llm)

# ========== Embedding Model ==========
embed_model = HuggingFaceBgeEmbeddings(
    model_name="BAAI/bge-base-en-v1.5",
    model_kwargs={'device': 'cpu'},  # Use 'cuda' if you have GPU
    encode_kwargs={'normalize_embeddings': True}
)

def embed_texts(texts):
    return embed_model.embed_documents(texts)

# ========== PDF CLEANING ==========
def get_header_footer(pdf_path, threshold=0.71):
    with pdfplumber.open(pdf_path) as pdf:
        total_pages = len(pdf.pages)
        random_page_nos = random.sample(range(5, total_pages), 10) if total_pages >= 15 else list(range(total_pages))

        avg_similarity = 1
        header_lines = -1
        while avg_similarity > threshold and header_lines < 4:
            header_lines += 1
            lines_to_compare = [
                pdf.pages[i].extract_text().split('\n')[header_lines]
                for i in random_page_nos if len(pdf.pages[i].extract_text().split('\n')) > header_lines
            ]
            similarities = cosine_similarity(embed_texts(lines_to_compare))
            avg_similarity = np.mean(similarities[np.triu_indices(len(similarities), k=1)])

        avg_similarity = 1
        footer_lines = -1
        while avg_similarity > threshold and footer_lines < 4:
            footer_lines += 1
            lines_to_compare = [
                pdf.pages[i].extract_text().split('\n')[-(footer_lines + 1)]
                for i in random_page_nos if len(pdf.pages[i].extract_text().split('\n')) > footer_lines
            ]
            similarities = cosine_similarity(embed_texts(lines_to_compare))
            avg_similarity = np.mean(similarities[np.triu_indices(len(similarities), k=1)])

        return header_lines, footer_lines

def extract_text_by_page(pdf_path):
    header_lines, footer_lines = get_header_footer(pdf_path)
    content_by_page = {}
    with pdfplumber.open(pdf_path) as pdf:
        for i, page in enumerate(pdf.pages):
            page_text = page.extract_text()
            if page_text:
                lines = page_text.split('\n')
                clean_text = '\n'.join(lines[header_lines:-(footer_lines + 1)]) if footer_lines >= 0 else '\n'.join(lines[header_lines:])
                content_by_page[i] = clean_text
    return content_by_page

# ========== DSPy Signatures ==========
class ContentAnalyzer(dspy.Signature):
    """
    Specialized analyzer for regulatory and compliance documents.

    The analyzer is designed to process documents page by page. Each invocation handles a dictionary of pages,
    extracting semantic tags to help identify the most relevant page for a given query.
    """
    content: Dict[int, str] = dspy.InputField(
        desc="A dictionary where key is the page number and value is the regulatory/compliance document content in markdown format."
    )
    content_tags: Dict[int, Dict[str, List[str]]] = dspy.OutputField(
        desc="""
            Semantic and structural tags extracted from each page, organized into high-level categories
            to support intelligent search and navigation.
            """
    )

class TagAnalysisPipeline(dspy.Module):
    def __init__(self):
        super().__init__()
        self.analyzer = dspy.Predict(ContentAnalyzer)

    def analyze(self, content: Dict[int, str]) -> Dict:
        """
        Analyze content and extract organized tags.
        """
        results = self.analyzer(content=content)
        return results.content_tags

class SearchTag(dspy.Signature):
    """
    You are an intelligent document analyzer. The system processes documents page by page, where each page has associated metadata in the form of tags. 
    Each invocation receives a dictionary where keys are page numbers and values are lists of tags for that page. 
    Your task is to identify the most relevant page(s) for a given user query based on these tags.
    
    Use semantic understanding of the query to match it with the most relevant tags.
    Be concise and confident in your answers.
    """
    query: str = dspy.InputField(
        desc="The query for which tags need to find out"
    )
    tags: Dict[int, Dict[str, str]] = dspy.InputField(
        desc="""A list of tags associated with each page,
            where keys are page numbers and dict are lists of tags."""
    )
    relevant_tags: Dict[int, str] = dspy.OutputField(
        desc="""
            Relevant tags for a query, think of all possibilities.
            Return the page as a key, and matched part as value.
            """
    )

class QueryAnswer(dspy.Signature):
    """
    Given a query and a document context, perform a multi-step analysis:
    1. Extract relevant content.
    2. Identify numerical values.
    3. Summarize the extracted information.
    4. Include section references.
    5. Handle missing information gracefully.
    """
    query: str = dspy.InputField(
        desc="The user query for which information needs to be retrieved and analyzed from the context."
    )
    context: str = dspy.InputField(
        desc="The source document or text from which to extract information related to the query."
    )
    output_answer: str = dspy.OutputField(
        desc="""
        A comprehensive answer to the query that includes:
        - A list of relevant sections at the top
        - A detailed response using bullet points, subheadings, and structured explanation
        - All important aspects of the query answered in a logical and readable manner
        """
    )

# ========== MAIN ==========
def main():
    # Create PDF directory if it doesn't exist
    pdf_dir = r"C:\Users\Admin\OneDrive\Desktop(1)\cmi\4\IP\projectfile\RAG-KnowledgeBase-System\Files"
    if not os.path.exists(pdf_dir):
        os.makedirs(pdf_dir)
        print(f"Created directory: {pdf_dir}")
        print("Please add PDF files to this directory and run the script again.")
        return
    
    documents = {}
    
    # Check if there are PDF files in the directory
    pdf_files = [f for f in os.listdir(pdf_dir) if f.endswith(".pdf")]
    if not pdf_files:
        print(f"No PDF files found in {pdf_dir}. Please add PDF files and run the script again.")
        return
    
    for filename in pdf_files:
        path = os.path.join(pdf_dir, filename)
        print(f"🔍 Processing: {filename}")
        content_by_page = extract_text_by_page(path)
        tagger = TagAnalysisPipeline()
        tags = tagger.analyze(content_by_page)
        documents[filename] = {
            "pages": content_by_page,
            "tags": tags
        }
    
  

TypeError: LM.__init__() missing 1 required positional argument: 'model'

In [None]:
import os
import pdfplumber
import numpy as np
import random
import warnings
warnings.filterwarnings("ignore")

from typing import Dict, List
from sklearn.metrics.pairwise import cosine_similarity

import dspy
from langchain.embeddings import HuggingFaceBgeEmbeddings
from langchain_groq import ChatGroq
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# ========== CONFIG ========== #
PDF_DIR = r"C:\Users\Admin\OneDrive\Desktop(1)\cmi\4\IP\projectfile\RAG-KnowledgeBase-System\Files"
SIMILARITY_THRESHOLD = 0.71

# ========== LLM SETUP ========== #
class GroqLM(dspy.LM):
    def __init__(self, model="llama3-70b-8192", temperature=0.2):
        super().__init__(model=model)
        self.lc_llm = ChatGroq(
            temperature=temperature,
            model_name=model,
            streaming=True,
            callback_manager=CallbackManager([StreamingStdOutCallbackHandler()])
        )
    
    def basic_request(self, prompt, **kwargs):
        response = self.lc_llm.invoke(prompt)
        return response.content
    
    def __call__(self, prompt, **kwargs):
        return self.basic_request(prompt, **kwargs)

# Configure DSPy with Groq
llm = GroqLM()
dspy.settings.configure(lm=llm)

# ========== EMBEDDING MODEL ========== #
embed_model = HuggingFaceBgeEmbeddings(
    model_name="BAAI/bge-base-en-v1.5",
    model_kwargs={"device": "cpu"},  # Change to 'cuda' if GPU is available
    encode_kwargs={"normalize_embeddings": True}
)

def embed_texts(texts: List[str]):
    return embed_model.embed_documents([t if t else " " for t in texts])  # Avoid empty strings

# ========== PDF PROCESSING UTILS ========== #
def get_header_footer(pdf_path, threshold=SIMILARITY_THRESHOLD):
    with pdfplumber.open(pdf_path) as pdf:
        total_pages = len(pdf.pages)
        sample_pages = random.sample(range(5, total_pages), 10) if total_pages >= 15 else list(range(total_pages))

        def compute_similarity(position: int, is_footer=False):
            lines = []
            for i in sample_pages:
                text = pdf.pages[i].extract_text()
                if text:
                    lines_list = text.split('\n')
                    if len(lines_list) > position:
                        line = lines_list[-(position + 1)] if is_footer else lines_list[position]
                        lines.append(line)
            if len(lines) < 2:
                return 0
            similarity = cosine_similarity(embed_texts(lines))
            return np.mean(similarity[np.triu_indices(len(similarity), k=1)])

        header_lines, footer_lines = -1, -1
        avg_similarity = 1
        while avg_similarity > threshold and header_lines < 4:
            header_lines += 1
            avg_similarity = compute_similarity(header_lines)

        avg_similarity = 1
        while avg_similarity > threshold and footer_lines < 4:
            footer_lines += 1
            avg_similarity = compute_similarity(footer_lines, is_footer=True)

        return header_lines, footer_lines

def extract_text_by_page(pdf_path):
    header_lines, footer_lines = get_header_footer(pdf_path)
    content_by_page = {}
    with pdfplumber.open(pdf_path) as pdf:
        for i, page in enumerate(pdf.pages):
            text = page.extract_text()
            if text:
                lines = text.split('\n')
                cleaned = '\n'.join(lines[header_lines:-(footer_lines + 1)]) if footer_lines >= 0 else '\n'.join(lines[header_lines:])
                content_by_page[i] = cleaned
    return content_by_page

# ========== DSPy SIGNATURES ========== #
class ContentAnalyzer(dspy.Signature):
    content: Dict[int, str] = dspy.InputField(desc="Page number to content mapping.")
    content_tags: Dict[int, Dict[str, List[str]]] = dspy.OutputField(desc="Extracted tags per page.")

class TagAnalysisPipeline(dspy.Module):
    def __init__(self):
        super().__init__()
        self.analyzer = dspy.Predict(ContentAnalyzer)

    def analyze(self, content: Dict[int, str]) -> Dict:
        results = self.analyzer(content=content)
        return results.content_tags

class SearchTag(dspy.Signature):
    query: str = dspy.InputField(desc="User query.")
    tags: Dict[int, Dict[str, str]] = dspy.InputField(desc="Tags per page.")
    relevant_tags: Dict[int, str] = dspy.OutputField(desc="Relevant tags matched to query.")

class QueryAnswer(dspy.Signature):
    query: str = dspy.InputField(desc="User query.")
    context: str = dspy.InputField(desc="Document context.")
    output_answer: str = dspy.OutputField(desc="Structured, comprehensive answer.")

# ========== MAIN LOGIC ========== #
def main():
    if not os.path.exists(PDF_DIR):
        os.makedirs(PDF_DIR)
        print(f"📁 Created directory: {PDF_DIR}")
        print("⚠️  Please add PDF files to this directory and run the script again.")
        return

    pdf_files = [f for f in os.listdir(PDF_DIR) if f.endswith(".pdf")]
    if not pdf_files:
        print(f"📂 No PDF files found in {PDF_DIR}. Please add PDF files and run again.")
        return

    documents = {}
    for filename in pdf_files:
        pdf_path = os.path.join(PDF_DIR, filename)
        print(f"🔍 Processing: {filename}")
        try:
            content_by_page = extract_text_by_page(pdf_path)
            tagger = TagAnalysisPipeline()
            tags = tagger.analyze(content_by_page)
            documents[filename] = {
                "pages": content_by_page,
                "tags": tags
            }
        except Exception as e:
            print(f"❌ Error processing {filename}: {str(e)}")

    print("✅ Document analysis complete. You can now perform queries on them.")

if __name__ == "__main__":
    main()





modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/94.6k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/52.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/777 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

KeyboardInterrupt: 