In [1]:
# Import necessary libraries for various functionalities
import os
# Set your OpenAI API key here
os.environ["OPENAI_API_KEY"] = "Add your API KEY for OPENAI"

In [None]:
# Importing required libraries for subprocess, concurrent tasks, and PDF processing
import subprocess
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from tqdm import tqdm  # For progress bars
from PyPDF2 import PdfReader  # For handling PDF files

# Importing LangChain specific components for document processing and retrieval
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.retrievers.multi_vector import MultiVectorRetriever, SearchType
from langchain.storage import InMemoryStore
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain.document_loaders import PDFPlumberLoader
from langchain.text_splitter import NLTKTextSplitter
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from BCEmbedding import RerankerModel
from langchain_core.runnables import RunnableLambda, RunnablePassthrough



In [None]:
def june_run_nougat(file_path, output_dir):
    """
    Run Nougat tool on the given file.

    Args:
        file_path (str): Path to the input PDF file.
        output_dir (str): Directory to store the output.

    Returns:
        int: 0 if operation is successful, 1 if failed.
    """
    cmd = ["nougat.exe", file_path, "-o", output_dir, "-m", "0.1.0-base", "--no-skipping"]
    res = subprocess.run(cmd)
    
    if res.returncode != 0:
        print(f"Error when running Nougat on {file_path}.")
        return res.returncode
    else:
        print(f"Operation completed for {file_path}!")
        return 0

def june_get_tables_from_mmd(mmd_path):
    """
    Extract tables from an MMD file generated by Nougat.

    Args:
        mmd_path (str): Path to the MMD file.

    Returns:
        list: List of tables extracted from the MMD file.
    """
    with open(mmd_path, encoding='utf-8') as f:
        lines = f.readlines()

    res = []
    tmp = []
    flag = ""
    
    for line in lines:
        if line == "\\begin{table}\n":
            flag = "BEGINTABLE"
        elif line == "\\end{table}\n":
            flag = "ENDTABLE"
        
        if flag == "BEGINTABLE":
            tmp.append(line)
        elif flag == "ENDTABLE":
            tmp.append(line)
            flag = "CAPTION"
        elif flag == "CAPTION":
            tmp.append(line)
            flag = "MARKDOWN"
            res.append(''.join(tmp))
            tmp = []
    
    return res

def process_pdf(file_path, output_dir):
    """
    Process a PDF file by extracting text and tables, and run Nougat tool.

    Args:
        file_path (str): Path to the input PDF file.
        output_dir (str): Directory to store the output.

    Returns:
        tuple: (texts_with_metadata, tables_with_metadata, pdf_id, elapsed_time)
            - texts_with_metadata: List of documents containing the extracted text.
            - tables_with_metadata: List of documents containing the extracted tables.
            - pdf_id: Unique identifier for the PDF file.
            - elapsed_time: Time taken to process the PDF.
    """
    start_time = time.time()

    pdf_id = str(uuid.uuid4())
    texts_with_metadata = []
    tables_with_metadata = []

    # Load PDF and extract text
    reader = PdfReader(file_path)
    pages = []
    
    for page in reader.pages:
        page_text = page.extract_text()
        if page_text:
            pages.append(Document(page_content=page_text, metadata={"pdf_id": pdf_id}))

    # Split text using NLTKTextSplitter
    text_splitter = NLTKTextSplitter()
    texts = text_splitter.split_documents(pages)

    # Add PDF ID to each text document's metadata
    texts_with_metadata.extend([Document(page_content=text.page_content, metadata={"pdf_id": pdf_id}) for text in texts])

    # Run Nougat tool
    if june_run_nougat(file_path, output_dir) == 1:
        print(f"Failed to process {file_path}")
        return None
    
    # Extract tables from MMD file generated by Nougat
    mmd_path = os.path.join(output_dir, os.path.splitext(os.path.basename(file_path))[0] + ".mmd")
    tables = june_get_tables_from_mmd(mmd_path)
    tables_with_metadata.extend([Document(page_content=table, metadata={"pdf_id": pdf_id}) for table in tables])

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Processed {file_path} in {elapsed_time:.2f} seconds")

    return texts_with_metadata, tables_with_metadata, pdf_id, elapsed_time

def process_pdfs_in_batches(input_dir, output_dir, batch_size=10, max_workers=4):
    """
    Process multiple PDFs in batches with parallel execution.

    Args:
        input_dir (str): Directory containing PDF files.
        output_dir (str): Directory to store the output.
        batch_size (int): Number of PDFs to process in each batch.
        max_workers (int): Maximum number of concurrent workers.

    Returns:
        tuple: (all_texts, all_tables, pdf_ids, processing_times)
            - all_texts: List of all extracted texts from PDFs.
            - all_tables: List of all extracted tables from PDFs.
            - pdf_ids: List of unique PDF IDs.
            - processing_times: List of processing times for each PDF.
    """
    pdf_files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith('.pdf')]
    
    all_texts = []
    all_tables = []
    pdf_ids = []
    processing_times = []

    for i in range(0, len(pdf_files), batch_size):
        batch_files = pdf_files[i:i + batch_size]

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [executor.submit(process_pdf, pdf_file, output_dir) for pdf_file in batch_files]

            # Show progress using tqdm
            for future in tqdm(as_completed(futures), total=len(batch_files), desc=f"Processing batch {i // batch_size + 1}"):
                result = future.result()
                if result:
                    texts, tables, pdf_id, elapsed_time = result
                    all_texts.extend(texts)
                    all_tables.extend(tables)
                    pdf_ids.append(pdf_id)
                    processing_times.append(elapsed_time)
    
    return all_texts, all_tables, pdf_ids, processing_times

# Example usage
input_dir = "/path/to/pdf/directory"
output_dir = "/path/to/output/directory"

# Process PDFs in batches
texts, tables, pdf_ids, processing_times = process_pdfs_in_batches(input_dir, output_dir)

# Print processing times for each file
for pdf_file, processing_time in zip(os.listdir(input_dir), processing_times):
    print(f"{pdf_file} processed in {processing_time:.2f} seconds")


In [None]:
# Define the table summarization prompt
table_prompt_text = """You are an assistant tasked with summarizing tables. \
Give a concise summary of the table by forming logical and corresponding relationships rather than broad summaries. Table chunk: {element}"""
table_prompt = ChatPromptTemplate.from_template(table_prompt_text)

# Define the text summarization prompt
text_prompt_text = """You are an assistant tasked with summarizing text. \
Give a concise summary of the text chunk. Text chunk: {element}"""
text_prompt = ChatPromptTemplate.from_template(text_prompt_text)

# Create the model instance
model = ChatOpenAI(temperature=0, model="gpt-4o-mini")

# Table summarization chain
table_summarize_chain = {"element": lambda x: x} | table_prompt | model | StrOutputParser()

# Text summarization chain
text_summarize_chain = {"element": lambda x: x} | text_prompt | model | StrOutputParser()

def summarize_tables(tables, max_concurrency=5):
    """
    Process the tables and generate summaries.
    
    Args:
        tables (list): A list of tables, each containing a text chunk.
        max_concurrency (int): The maximum number of concurrent requests.
    
    Returns:
        list: A list of table summaries.
    """
    try:
        table_summaries = table_summarize_chain.batch([table['content'] for table in tables], {"max_concurrency": max_concurrency})
        return table_summaries
    except Exception as e:
        print(f"Error summarizing tables: {e}")
        return []

def summarize_texts(texts, max_concurrency=5):
    """
    Process the texts and generate summaries.
    
    Args:
        texts (list): A list of text chunks.
        max_concurrency (int): The maximum number of concurrent requests.
    
    Returns:
        list: A list of text summaries.
    """
    try:
        text_summaries = text_summarize_chain.batch([text['content'] for text in texts], {"max_concurrency": max_concurrency})
        return text_summaries
    except Exception as e:
        print(f"Error summarizing texts: {e}")
        return []

# Assuming `tables` and `texts` are already obtained
# Get table summaries
table_summaries = summarize_tables(tables, max_concurrency=5)

# Get text summaries
text_summaries = summarize_texts(texts, max_concurrency=5)

# Print the summaries
for i, table_summary in enumerate(table_summaries):
    print(f"Table {i+1} Summary: {table_summary}")

for i, text_summary in enumerate(text_summaries):
    print(f"Text {i+1} Summary: {text_summary}")

In [None]:
# Set persistent directory
persist_directory = ''  # Specify your persistent directory here

# Initialize the Chroma vector store with OpenAI embeddings
vectorstore = Chroma(collection_name="summaries", embedding_function=OpenAIEmbeddings(), persist_directory=persist_directory)
vectorstore.persist()

# Create an in-memory document store
store = InMemoryStore()
id_key = "doc_id"

# Create the MultiVectorRetriever for handling vector and document retrieval
MultiVector_retriever = MultiVectorRetriever(
    vectorstore=vectorstore,
    docstore=store,
    id_key=id_key,
    search_kwargs={"k": 10}  # Adjust the number of results to return from the search
)

# Function to add documents (text or tables) with summaries to the retriever
def add_documents_to_retriever(contents, summaries, documents_type='text'):
    """
    Adds text or table documents along with summaries to the vector store and document store.

    Args:
        contents (list): A list of documents (text or table).
        summaries (list): A list of summaries corresponding to the documents.
        documents_type (str): The type of document being added ('text' or 'table').
    """
    doc_ids = [str(uuid.uuid4()) for _ in contents]
    
    # Create the documents with content and metadata
    documents = [
        Document(page_content=content["content"], metadata={
            id_key: doc_ids[i],
            "pdf_id": contents[i]["metadata"]["pdf_id"],
            "summary": summaries[i]
        })
        for i, content in enumerate(contents)
    ]
    
    # Add documents to the vector store and docstore
    MultiVector_retriever.vectorstore.add_documents(documents)
    MultiVector_retriever.docstore.mset(list(zip(doc_ids, [content["content"] for content in contents])))

# Add text and table documents to the retriever
add_documents_to_retriever(texts, text_summaries, documents_type='text')
add_documents_to_retriever(tables, table_summaries, documents_type='table')

# Set the search type to MMR (Maximum Margin Retrieval)
MultiVector_retriever.search_type = SearchType.mmr

In [None]:
# Create unique IDs for each text and table
text_ids = [str(uuid.uuid4()) for _ in texts]
table_ids = [str(uuid.uuid4()) for _ in tables]

# Initialize mappings for content and summaries
id_to_content = {}
id_to_summary = {}

# Build the mapping for texts
for i, text in enumerate(texts):
    text_id = text_ids[i]
    id_to_content[text_id] = text['content']
    id_to_summary[text_id] = text_summaries[i]

# Build the mapping for tables
for i, table in enumerate(tables):
    table_id = table_ids[i]
    id_to_content[table_id] = table['content']
    id_to_summary[table_id] = table_summaries[i]

# Combine original content and summaries for BM25 input
combined_texts_with_ids = []
for content_id in id_to_content.keys():
    combined_texts_with_ids.append((content_id, id_to_content[content_id]))  # Original content
    combined_texts_with_ids.append((content_id, id_to_summary[content_id]))  # Corresponding summary

# Initialize BM25Retriever
bm25_retriever = BM25Retriever.from_texts(
    [text for _, text in combined_texts_with_ids], k=10  # Top 10 results
)

# Initialize the EnsembleRetriever with BM25 and MultiVector retrievers
ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, MultiVector_retriever], weights=[0.3, 1]
)

# Function to handle retrieval and ranking using EnsembleRetriever
def retrieve_with_ensemble(query, k=10):
    """
    Retrieves documents using the ensemble of retrievers (BM25 and MultiVector).

    Args:
        query (str): The search query to use for retrieval.
        k (int): The number of results to return.

    Returns:
        List of documents retrieved based on the ensemble search.
    """
    return ensemble_retriever.retrieve(query, k=k)


In [None]:
def create_documents(all_queries):
    # Assuming ensemble_retriever is defined and accessible
    retrieved_documents = []
    for query in all_queries:
        results = ensemble_retriever.get_relevant_documents(query)
        docString = [doc.page_content for doc in results]  # Assuming each result has a 'page_content' attribute
        retrieved_documents.extend(docString)
    
    # Use set for efficient deduplication
    unique_documents = list(set(retrieved_documents))
    
    # Assuming the initial query is to be compared against each unique document
    # Note: Adjust this part if you intend to use a different approach
    pairs = [[query, doc] for doc in unique_documents]
    
    model = RerankerModel(model_name_or_path="maidalun1020/bce-reranker-base_v1")
    # Assuming compute_score method exists and works as expected
    scores = model.compute_score(pairs)
    
    # Combine scores with documents
    scored_documents = [{"score": score, "document": doc} for score, doc in zip(scores, unique_documents)]
    
    # Sort by score in descending order and select top 5
    sorted_docs = sorted(scored_documents, key=lambda x: x['score'], reverse=True)[:20]
    
    # Assuming rerank method exists, adjust according to its actual usage if different
    # This step seems redundant if we've already sorted documents based on scores
    # You might skip reranking if compute_score already provides the relevance order
    # Uncomment the next line if reranking with a different method/model is indeed needed
    # final_reranked = model.rerank(...)

    return sorted_docs

In [None]:
def create_documents(all_queries, preserve_metadata=False):
    """
    Retrieves relevant documents for a list of queries using the ensemble retriever,
    and returns a list of unique document contents. Optionally preserves metadata.

    Args:
        all_queries (list of str): The list of queries to retrieve documents for.
        preserve_metadata (bool): Whether to keep metadata associated with the documents.

    Returns:
        list of str or list of dict: If preserve_metadata is False, returns a list of unique document contents.
                                     If True, returns a list of dictionaries with 'content' and 'metadata'.
    """
    retrieved_documents = []
    
    for query in all_queries:
        try:
            # Retrieve relevant documents using the ensemble retriever
            results = ensemble_retriever.get_relevant_documents(query)
            
            if results:
                if preserve_metadata:
                    # Store both the content and metadata
                    retrieved_documents.extend([{'content': doc.page_content, 'metadata': doc.metadata} for doc in results])
                else:
                    # Only store the content
                    retrieved_documents.extend([doc.page_content for doc in results])
            else:
                print(f"No results found for query: {query}")
        except Exception as e:
            print(f"Error retrieving documents for query '{query}': {e}")
    
    # Use set for efficient deduplication (converting to set will remove duplicates)
    unique_documents = list({doc['content'] if preserve_metadata else doc for doc in retrieved_documents})
    
    return unique_documents


In [None]:
# Prompt template
template = """Answer the question based only on the following context, which can include text and tables:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

# LLM
model = ChatOpenAI(temperature = 0, model = "gpt-4")

chain = (
    {"context": RunnableLambda(create_original_query)| RunnableLambda(create_documents), "question": RunnablePassthrough()} 
    | prompt  
    | model  
    | StrOutputParser()  
)


In [None]:
# Load the CSV file
file_path = ''
df = pd.read_excel(file_path)

# Extract questions and answers
questions = df['user_input'].tolist()
ground_truth = df['reference'].tolist()

# Initialize empty lists for answers and contexts
answers = []
contexts = []

# Define batch size
batch_size = 5

# Retry logic for SSL errors
def retry_request(func, retries=3, delay=5):
    for attempt in range(retries):
        try:
            return func()
        except Exception as e:
            if attempt < retries - 1:
                time.sleep(delay)
                delay *= 2  # Exponential backoff
            else:
                print(f"Max retries reached: {e}")
                return None

# Generate answers and contexts in batches to reduce load and avoid SSL errors
for i in range(0, len(questions), batch_size):
    batch_questions = questions[i:i + batch_size]
    batch_answers = []
    batch_contexts = []

    for query in batch_questions:
        # Adding delay to avoid overwhelming the server
        time.sleep(2)  # Shorter delay since we have retry logic

        # Using ensemble_retriever to get context (Replace with your retriever logic)
        relevant_documents = retry_request(lambda: ensemble_retriever.get_relevant_documents(query))
        if relevant_documents is None:
            batch_contexts.append([])
            batch_answers.append("No relevant context found due to an error.")
            print(f"Error fetching documents for query: {query}")
            continue

        document_contents = [doc.page_content for doc in relevant_documents]
        batch_contexts.append(document_contents)

        # Using context to generate an answer (Replace with your model invocation logic)
        if document_contents:  # Check if there is context content
            context_string = "\n".join(document_contents)
            answer = retry_request(lambda: chain.invoke({"context": context_string, "question": query}))
            if answer is None:
                answer = "Error during model invocation."
        else:
            answer = "No relevant context found."
        batch_answers.append(answer)

    # Append current batch results to the main lists
    answers.extend(batch_answers)
    contexts.extend(batch_contexts)

# Construct the dataset
data = {
    "question": questions,
    "answer": answers,
    "contexts": contexts,
    "ground_truth": ground_truth,
}

# Create the dataset
dataset = Dataset.from_dict(data)

# Optionally, save dataset to disk for later use
dataset.save_to_disk("processed_dataset")

In [None]:
from ragas.llms.base import LangchainLLMWrapper
from langchain_community.chat_models import ChatOpenAI  # Modify import
from ragas import evaluate
from ragas.metrics import (
    RubricsScoreWithReference,
    answer_relevancy,
    faithfulness,
    context_recall,
    context_precision,
    answer_correctness,
    answer_similarity
)

# Create Langchain LLM instance, e.g., ChatOpenAI
langchain_llm = ChatOpenAI(model_name="gpt-4", temperature=0)

# Wrap the Langchain LLM instance in LangchainLLMWrapper
wrapped_llm = LangchainLLMWrapper(langchain_llm)

# Define custom rubrics
my_custom_rubrics = {
    "score1_description": "The response is incorrect, irrelevant, or does not align with the ground truth.",
    "score2_description": "The response partially matches the ground truth but includes significant errors, omissions, or irrelevant information.",
    "score3_description": "The response generally aligns with the ground truth but may lack detail, clarity, or have minor inaccuracies.",
    "score4_description": "The response is mostly accurate and aligns well with the ground truth, with only minor issues or missing details.",
    "score5_description": "The response is fully accurate, aligns completely with the ground truth, and is clear and detailed.",
}

# Use the dataset you previously constructed
# dataset = Dataset.from_dict(data)  # Constructed earlier with questions, answers, and ground truth

# Instantiate the metric class
metric_with_ref = RubricsScoreWithReference(rubrics=my_custom_rubrics)

# Perform the evaluation on the dataset
try:
    result = evaluate(
        dataset=dataset,
        metrics=[
            metric_with_ref,
            context_precision,
            faithfulness,
            answer_relevancy,
            context_recall,
            answer_correctness,
            answer_similarity
        ],
        llm=wrapped_llm  # Use the wrapped LLM for evaluation
    )
    # Print the evaluation results
    print(result)
except Exception as e:
    print(f"Error during evaluation: {e}")
