In [1]:
import pytesseract
from PIL import Image
import fitz  # PyMuPDF for PDF handling
from openpyxl import load_workbook
from docx import Document as DocxDocument
from langchain.embeddings import OllamaEmbeddings
from langchain.vectorstores import Chroma
from langchain.prompts import ChatPromptTemplate, PromptTemplate
from langchain.chat_models import ChatOllama
from langchain.schema.runnable import RunnablePassthrough
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.chains import RetrievalQA
import gradio as gr
import requests
from bs4 import BeautifulSoup
import re
from uuid import uuid4  # Import uuid4 for unique collection names
from email import policy
from email.parser import BytesParser  # For parsing .eml files

# Path to your logo
logo_path = r"C:\Users\M\Desktop\RAKBANK.png"

# Declare vector_db globally
vector_db = None

# Function to perform OCR on the image and extract text
def extract_text_from_image(image_path):
    try:
        img = Image.open(image_path)
        text = pytesseract.image_to_string(img)
        return text
    except Exception as e:
        print(f"An error occurred while processing the image: {e}")
        return ""

# Function to extract text from a PDF, including form fields
def extract_text_from_pdf(pdf_path):
    text = ""
    form_text = ""

    try:
        pdf_document = fitz.open(pdf_path)

        # Extract plain text from the pages
        for page_num in range(len(pdf_document)):
            page = pdf_document.load_page(page_num)
            text += page.get_text()

            # Extract form fields and their contents
            fields = page.widgets()
            if fields:  # Check if any widgets are found
                for field in fields:
                    field_name = field.field_name
                    field_value = field.field_value  # Use field_value to get the filled text
                    if field_value:
                        form_text += f"{field_name}: {field_value}\n"
                    else:
                        form_text += f"{field_name}: [Empty]\n"

        pdf_document.close()
    except Exception as e:
        print(f"An error occurred while processing the PDF: {e}")

    return text + "\n" + form_text

# Function to extract text from an EXCEL file
def extract_text_from_excel(excel_path):
    text = ""
    try:
        wb = load_workbook(excel_path, data_only=True)
        for sheet in wb:
            for row in sheet.iter_rows(values_only=True):
                row_text = [str(cell) if cell is not None else '' for cell in row]
                text += ' '.join(row_text) + '\n'
        return text
    except Exception as e:
        print(f"An error occurred while processing the EXCEL file: {e}")
        return ""

# Function to extract text from a WORD file
def extract_text_from_word(word_path):
    text = ""
    try:
        doc = DocxDocument(word_path)
        for para in doc.paragraphs:
            text += para.text + '\n'
        return text
    except Exception as e:
        print(f"An error occurred while processing the WORD file: {e}")
        return ""

# Function to extract text from an EML file
def extract_text_from_eml(eml_path):
    try:
        with open(eml_path, 'rb') as fp:
            msg = BytesParser(policy=policy.default).parse(fp)
        # Extract text content
        text_parts = []
        for part in msg.walk():
            if part.get_content_type() == 'text/plain':
                text_parts.append(part.get_payload(decode=True).decode('utf-8', errors='replace'))
            elif part.get_content_type() == 'text/html':
                # Optionally, extract text from HTML parts
                html_content = part.get_payload(decode=True).decode('utf-8', errors='replace')
                soup = BeautifulSoup(html_content, "html.parser")
                text = soup.get_text(separator=' ')
                text = re.sub(r'\s+', ' ', text).strip()
                text_parts.append(text)
        text = '\n'.join(text_parts)
        return text
    except Exception as e:
        print(f"An error occurred while processing the EML file: {e}")
        return ""

# Function to extract text from a website URL
def extract_text_from_website(url):
    try:
        headers = {
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
                "(KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36"
            )
        }
        response = requests.get(url, headers=headers)
        if response.status_code != 200:
            print(f"Failed to retrieve the website. Status code: {response.status_code}")
            return ""
        soup = BeautifulSoup(response.content, "html.parser")

        # Remove script and style elements
        for script_or_style in soup(["script", "style"]):
            script_or_style.decompose()

        # Get text
        text = soup.get_text(separator=' ')

        # Remove multiple spaces and newlines
        text = re.sub(r'\s+', ' ', text)
        return text.strip()
    except Exception as e:
        print(f"An error occurred while processing the website: {e}")
        return ""

# Define a simple Document class
class Document:
    def __init__(self, text, metadata=None):
        self.page_content = text
        self.metadata = metadata or {}

# Query prompt template
QUERY_PROMPT = PromptTemplate(
    input_variables=["question"],
    template="""You are an AI language model assistant. Your task is to generate five
different versions of the given user question to retrieve relevant documents from
a vector database. By generating multiple perspectives on the user question, your
goal is to help the user overcome some of the limitations of the distance-based
similarity search. Provide these alternative questions separated by newlines.
Original question: {question}""",
)

# Function to process multiple documents, including websites
def process_documents(files, urls, checklist_path):
    # Initialize the documents list and print debug info
    print("Initializing document processing...")
    documents = []  # Ensuring the initialization of documents

    # Process uploaded files
    for file in files:  # Loop through each file
        print(f"File uploaded: {file.name}")
        if file.name.endswith((".jpg", ".jpeg", ".png")):
            print(f"Processing image: {file.name}")
            image_text = extract_text_from_image(file.name)
            if image_text:
                print(f"Extracted text from image: {image_text[:100]}...")  # Debug log (first 100 chars)
                documents.append(Document(image_text, metadata={"source": file.name}))
            else:
                print(f"No text extracted from the image: {file.name}")
        elif file.name.endswith(".pdf"):
            print(f"Processing PDF: {file.name}")
            pdf_text = extract_text_from_pdf(file.name)
            if pdf_text:
                print(f"Extracted text from PDF: {pdf_text[:100]}...")  # Debug log (first 100 chars)
                documents.append(Document(pdf_text, metadata={"source": file.name}))
            else:
                print(f"No text extracted from the PDF: {file.name}")
        elif file.name.endswith((".xlsx", ".xls")):
            print(f"Processing EXCEL file: {file.name}")
            excel_text = extract_text_from_excel(file.name)
            if excel_text:
                print(f"Extracted text from EXCEL file: {excel_text[:100]}...")  # Debug log (first 100 chars)
                documents.append(Document(excel_text, metadata={"source": file.name}))
            else:
                print(f"No text extracted from the EXCEL file: {file.name}")
        elif file.name.endswith(".docx"):
            print(f"Processing WORD file: {file.name}")
            word_text = extract_text_from_word(file.name)
            if word_text:
                print(f"Extracted text from WORD file: {word_text[:100]}...")  # Debug log (first 100 chars)
                documents.append(Document(word_text, metadata={"source": file.name}))
            else:
                print(f"No text extracted from the WORD file: {file.name}")
        elif file.name.endswith(".eml"):
            print(f"Processing EML file: {file.name}")
            eml_text = extract_text_from_eml(file.name)
            if eml_text:
                print(f"Extracted text from EML file: {eml_text[:100]}...")  # Debug log (first 100 chars)
                documents.append(Document(eml_text, metadata={"source": file.name}))
            else:
                print(f"No text extracted from the EML file: {file.name}")
        else:
            print(f"Unsupported file format: {file.name}")
            # Continue processing other files instead of returning
            continue

    # Process website URLs
    for url in urls:
        print(f"Processing website: {url}")
        website_text = extract_text_from_website(url)
        if website_text:
            print(f"Extracted text from website: {website_text[:100]}...")  # Debug log (first 100 chars)
            documents.append(Document(website_text, metadata={"source": url}))
        else:
            print(f"No text extracted from the website: {url}")

    # Ensure that documents are created and provide debug info
    if not documents:
        print("No valid content extracted from the document(s).")
        return "No valid content extracted from the document(s)."

    print(f"Document list contains {len(documents)} item(s).")

    # Create Chroma vector database and embed the documents
    try:
        print(f"Embedding documents into Chroma: {len(documents)} document(s)")
        global vector_db  # Making vector_db accessible globally

        # Generate a unique collection name to prevent reuse
        collection_name = f"collection_{uuid4()}"

        vector_db = Chroma.from_documents(
            documents=documents,
            embedding=OllamaEmbeddings(model="nomic-embed-text", show_progress=True),
            collection_name=collection_name,
            persist_directory=None  # Use in-memory storage to avoid persistence
        )
    except Exception as e:
        print(f"An error occurred while creating the vector database: {e}")
        return "Error during vector embedding process."

    return "Documents and websites successfully processed and embedded."

# Function to query details and populate the checklist
def populate_checklist(checklist_path):
    global vector_db  # Access the global vector database

    # Check if vector_db is initialized
    if vector_db is None:
        return "Please process the documents first before populating the checklist."

    local_model = "llama3.1"  # LLM model
    llm = ChatOllama(model=local_model)  # Initialize the LLM

    retriever = vector_db.as_retriever()

    # Create the RetrievalQA chain
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=False
    )

    # Populate the checklist using the chain
    try:
        wb = load_workbook(checklist_path)
        ws = wb.active

        for row_idx, row in enumerate(ws.iter_rows(min_row=2, max_col=2, values_only=False), start=2):  # Assume column A has queries and column B will get populated
            query_cell = row[0]
            if query_cell.value:  # Ensure the query is not empty
                # Query the QA model for the specific detail
                query_result = qa_chain.run(query_cell.value)
                # Populate column B with the result
                ws.cell(row=row_idx, column=2).value = query_result

        # Save the updated checklist
        wb.save(checklist_path)
        print(f"Checklist updated and saved to {checklist_path}")
        return f"Checklist successfully populated and saved to {checklist_path}"

    except Exception as e:
        return f"An error occurred while populating the checklist: {e}"

# Function to handle asking a question about the embedded documents
def ask_question(question):
    global vector_db  # Access the global vector database

    # Check if vector_db is initialized
    if vector_db is None:
        return "Please process the documents first before asking a question."

    local_model = "llama3.1"  # LLM model
    llm = ChatOllama(model=local_model)  # Initialize the LLM

    retriever = vector_db.as_retriever()

    # Create the RetrievalQA chain
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=False
    )

    # Retrieve the answer from the chain
    answer = qa_chain.run(question)
    return answer

# Gradio Layout with CSS for font size and header
with gr.Blocks() as iface:
    # Display the logo
    gr.Image(logo_path, label="", type="filepath", interactive=False)
    
    # Add header text below the logo
    gr.Markdown("<h2 style='text-align: center; color: red;'> AI Powered Customer Fulfilment Intelligent Document Processor & Chat Bot </h2>")
    
    # Input section for files, website URLs, and checklist path with larger font
    with gr.Row():
        file_input = gr.Files(
            label="Upload your documents (PDF, Image, EXCEL, WORD, or EML)", 
            file_types=['.jpg', '.jpeg', '.png', '.pdf', '.xlsx', '.xls', '.docx', '.eml'],
            elem_id="file_input"
        )
        url_input = gr.Textbox(label="Enter website URLs (separated by commas)", elem_id="url_input")
        checklist_input = gr.Textbox(label="Checklist Excel File Path", elem_id="checklist_input")
    
    # Button to trigger document processing
    process_button = gr.Button("Process Documents and Websites")
    
    # Output section for document processing result with larger font
    result_output = gr.Textbox(label="Output", elem_id="result_output")
    
    # Input for asking questions about the documents
    question_input = gr.Textbox(label="Ask a Question About the Documents", placeholder="Type your question here...")
    
    # Button to ask a question and display the answer
    ask_button = gr.Button("Ask Question")
    answer_output = gr.Textbox(label="Answer", interactive=False)
    
    # Button to process and populate the checklist
    populate_button = gr.Button("Populate Checklist")
    populate_output = gr.Textbox(label="Checklist Status", interactive=False)

    # Trigger the process_documents function on button click
    def process_inputs(files, url_text, checklist_path):
        # Split the URL text into a list of URLs
        urls = [url.strip() for url in url_text.split(",") if url.strip()]
        return process_documents(files, urls, checklist_path)
    
    process_button.click(process_inputs, inputs=[file_input, url_input, checklist_input], outputs=result_output)
    
    # Trigger the ask_question function to retrieve answers based on user's query
    ask_button.click(ask_question, inputs=question_input, outputs=answer_output)

    # Trigger the populate_checklist function to populate the checklist
    populate_button.click(populate_checklist, inputs=checklist_input, outputs=populate_output)

# Custom CSS for larger font sizes
iface.css = """
#file_input label, #checklist_input label, #result_output label, #question_input label, #answer_output label, #populate_output label {
    font-size: 18px;
}
"""

# Launch the Gradio interface
iface.launch(share=True)


  from .autonotebook import tqdm as notebook_tqdm


* Running on local URL:  http://127.0.0.1:7863

Could not create share link. Please check your internet connection or our status page: https://status.gradio.app.


