In [None]:
import fitz
from langchain_experimental.text_splitter import SemanticChunker
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
import json
from docx import Document as Doc
import os
from langchain.prompts import PromptTemplate
from langchain_chroma import Chroma
from langchain_community.llms import Ollama
from langchain.chains import RetrievalQA
from langchain.schema import Document
import io
import zipfile
import base64
from PIL import Image
import numpy as np
import cv2
import pytesseract
import pandas as pd
import openpyxl
import datetime

In [19]:
def extract_docx_images(file):
    images = []
    doc_zip = zipfile.ZipFile(file)
    for name in doc_zip.namelist():
        if name.startswith("word/media"):
            image_data = doc_zip.read(name)
            image_bytes = io.BytesIO(image_data)
            images.append((name, Image.open(image_bytes)))
    return images

In [20]:
def process_image_and_ocr(images):
    final_image_data = []
    for image in images:
        img_array = np.array(image[1])
        img_bw = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
        final_image = cv2.GaussianBlur(img_bw, (3,3), 0)

        extracted_image_data = pytesseract.image_to_string(final_image)
        final_image_data.append((image[0], extracted_image_data))
    
    return final_image_data

In [None]:
def get_and_read_file(file):
    content = file.read()
    text = ""
    if file.name.endswith(".docx"):
        byte_content = io.BytesIO(content)
        doc = Doc(byte_content)
        paras = [para.text for para in doc.paragraphs]
        tables = []

        images = extract_docx_images(byte_content)
        if images:
            image_data = process_image_and_ocr(images)

        image_text = "\n".join(f"{image[0]}\n{image[1]}\n\n" for image in image_data)

        for table in doc.tables:
            for row in table.rows:
                row_text = [cell.text.strip() for cell in row.cells]
                tables.append("\t".join(row_text))
        text = "\n".join(tables + paras + [image_text])
        return text

    elif file.name.endswith(".pdf"):
        file.seek(0)
        content = file.read()
        doc = fitz.open(stream=content, filetype="pdf")
        text = "\n".join(page.get_text() for page in doc)
        images = [page.get_images(full=True) for page in doc]
        return text

In [None]:
# splitting file into chunks
def split_pdf_to_chunks(file):
    text = get_and_read_file(file)
    splitter = SemanticChunker(embeddings=HuggingFaceEmbeddings(model_name="BAAI/bge-large-en-v1.5"))
    chunks = splitter.split_text(text)
    data = []
    for i, chunk in enumerate(chunks):
        segment_data = {"chunk_number": i, "chunk_content": chunk}
        data.append(segment_data)
    return data

def split_docx_to_chunks(file):
    text = get_and_read_file(file)
    splitter = SemanticChunker(embeddings=HuggingFaceEmbeddings(model_name="BAAI/bge-large-en-v1.5"))
    chunks = splitter.split_text(text)
    data = []
    for i, chunk in enumerate(chunks):
        segment_data = {"chunk_number": i, "chunk_content": chunk}
        data.append(segment_data)
    return data

def split_excel_to_chunks(file):
    if file.name.endswith(".xlsx"):
        content = file.read()
        df = pd.read_excel(io.BytesIO(content), engine="openpyxl")
        headers = df.columns.tolist()
        rows = df.values.tolist()
        chunks = []
        for i in range(len(rows)):
            chunk = {
                "chunk_number": i,
                "chunk_content": []
            }
            for j in range(len(headers)):
                value = rows[i][j]
                if pd.isna(value):
                    value = ""
                elif isinstance(value, (pd.Timestamp, pd.NaT.__class__)):
                    value = str(value)
                elif isinstance(value, (datetime.datetime, datetime.date)):
                    value = str(value)
                chunk["chunk_content"].append({headers[j]: value})
            chunks.append(chunk)
        return chunks

In [None]:
def get_chunks(file, file_name):
    if file_name.endswith(".pdf"):
        data = split_pdf_to_chunks(file)
    elif file_name.endswith(".docx"):
        data = split_docx_to_chunks(file)
    elif file_name.endswith(".xlsx"):
        data = split_excel_to_chunks(file)
        return data
    else:
        print("Not a '.docx' or '.pdf' file.")
        return []
    chunks = [i["chunk_content"] for i in data]
    return chunks

In [25]:
# optional
def save_chunks_to_file(chunks, file_name):
    with open(f"chunk_files/{file_name}_chunks.txt", "w") as write_file:
        json.dump(chunks, write_file, indent=2)

In [26]:
def init_db():
    embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
    vectordb = Chroma(
        collection_name="brd_collection",
        persist_directory="./my_db",
        embedding_function=embedding_model
    )
    retriever = vectordb.as_retriever(search_kwargs={"k":5})
    return vectordb, retriever

In [None]:
def store_to_vectordb(vectordb, chunks, file_name):
    if not chunks:
        return

    docs = []
    if file_name.endswith(".xlsx"):
        for i, chunk in enumerate(chunks):
            content = ", ".join([
                f"{list(item.keys())[0]}: {list(item.values())[0]}" for item in chunk["chunk_content"]
            ])
            docs.append(Document(page_content=content, metadata={"source": file_name, "chunk_number": i}))
    else:
        for i, chunk in enumerate(chunks):
            docs.append(Document(page_content=chunk, metadata={"source": file_name, "chunk_number": i}))
    vectordb.add_documents(docs)

In [28]:
# creating RAG chain 
def create_qa_chain(retriever):

    custom_prompt = PromptTemplate(
        input_variables=["context", "question"],
        template=(
            "Based only on the following context, answer the user's question. "
            "If the answer is not present in the context, say 'Not found in the provided documents.'\n\n"
            "Context:\n{context}\n\nQuestion: {question}\nAnswer:"
        )
    )

    llm = Ollama(model="mistral")
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        retriever=retriever,
        chain_type="stuff",
        chain_type_kwargs={"prompt": custom_prompt}
    )
    return qa_chain

In [29]:
def handle_query(query, qa_chain, file_name):
    try:
        if file_name:
            return qa_chain.invoke({"query": query, "filter": {"source": file_name}})
        else:
            return qa_chain.invoke(query)
    except Exception as e:
        print("Error:", e)

In [None]:
def pipeline(vectordb, file):
    chunks = get_chunks(file, file.name)
    store_to_vectordb(vectordb, chunks, file.name)
    save_chunks_to_file(chunks, file.name)
    return chunks

In [31]:
def flush_db():
    client = Chroma.Client()
    collections = client.list_collections()

    for col in collections:
        client.delete_collection(name=col.name)