In [None]:
import pdfplumber
import os
from PIL import Image
import shutil
import fitz
from sentence_transformers import SentenceTransformer, models, util
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from glob import glob
from llama_index.core.node_parser import SentenceSplitter
import torch
from torchvision import transforms
import chromadb
import uuid
from chromadb.config import Settings
import chromadb
from sentence_transformers import SentenceTransformer
from langchain_chroma import Chroma

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain import PromptTemplate
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI
from IPython.display import display

In [None]:
os.environ["OPENAI_API_KEY"] = None

In [None]:
def clear_output_folder(folder_path):
    if os.path.exists(folder_path):
        shutil.rmtree(folder_path)
        print(f'Existing folder "{folder_path}" and its contents have been removed.')
    os.makedirs(folder_path, exist_ok=True)
    print(f'Folder "{folder_path}" is ready for new content.')

In [None]:
def extract_images_and_text(pdf_path, images_output_folder, text_output_file):
    os.makedirs(images_output_folder, exist_ok=True)

    doc = fitz.open(pdf_path)

    with open(text_output_file, "w", encoding="utf-8") as txt_file:
        for page_num in range(len(doc)):
            page = doc.load_page(page_num) 

            text = page.get_text()
            txt_file.write(f"--- Page {page_num + 1} ---\n")
            txt_file.write(text)
            txt_file.write("\n\n")

            image_list = page.get_images(full=True)
            if image_list:
                for img_index, img in enumerate(image_list, start=1):
                    xref = img[0]
                    base_image = doc.extract_image(xref)
                    image_bytes = base_image["image"]
                    image_ext = base_image["ext"]
                    image_filename = f"page{page_num + 1}_img{img_index}.{image_ext}"
                    image_path = os.path.join(images_output_folder, image_filename)
                    with open(image_path, "wb") as img_file:
                        img_file.write(image_bytes)
            else:
                print(f"No images found on page {page_num + 1}.")
        
        print(f"\nText extraction complete. Saved to '{text_output_file}'.")
        print(f"Image extraction complete. Images saved in '{images_output_folder}'.")

In [None]:
pdf_path = 'files/demo4.pdf'
images_output_folder = 'extracted_content'
text_output_file = f'{images_output_folder}/pages.txt'
clear_output_folder(images_output_folder)
extract_images_and_text(pdf_path, images_output_folder, text_output_file)

In [None]:
def read_text_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    return text

text = read_text_file(text_output_file)

In [None]:
def chunk_text(text, chunk_size=77, chunk_overlap=0):
    text_splitter = SentenceSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )
    chunks = text_splitter.split_text(text)
    print(f"Total text chunks created: {len(chunks)}")
    return chunks

chunks = chunk_text(text)

In [None]:
def load_images_from_dir(directory, extensions=(".jpg", ".jpeg", ".png", ".bmp")):
    image_paths = []
    for root, _, files in os.walk(directory):
        for file in files:
            if file.lower().endswith(extensions):
                image_paths.append(os.path.join(root, file))

    images = []
    for img_path in image_paths:
        pil_image = Image.open(img_path).convert("RGB")
        images.append((pil_image))
    
    return images, image_paths

images, image_paths = load_images_from_dir(images_output_folder)

In [None]:
def generate_embeddings(model, text, images):
    embeddings = {}
    
    text_embedding = model.encode(text)
    embeddings['text'] = text_embedding

    img_embedding = model.encode(images)
    embeddings['images'] = img_embedding
    return embeddings

model = SentenceTransformer('clip-ViT-B-32')
embeddings = generate_embeddings(model, chunks, images)

In [None]:
def add_text_embeddings(collection, chunks, embeddings):
    text_documents = []
    text_embeddings = []
    for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
        doc_id = f"text_{i}_{uuid.uuid4()}" 
        text_documents.append({
            "id": doc_id,
            "text": chunk,
            "metadata": {
                "type": "text",
                "chunk_index": i
            }
        })
        text_embeddings.append(emb.tolist())

    collection.add(
        documents=[doc["text"] for doc in text_documents],
        embeddings=text_embeddings,
        ids=[doc["id"] for doc in text_documents],
        metadatas=[doc["metadata"] for doc in text_documents]
    )

def add_image_embeddings(collection, image_paths, embeddings):
    image_documents = []
    image_embeddings = []
    for img, img_name in zip(embeddings, image_paths):
        doc_id = f"image_{img_name}_{uuid.uuid4()}" 
        image_documents.append({
            "id": doc_id,
            "image_path": img_name, 
            "metadata": {
                "type": "image",
                "image_name": img_name
            }
        })
        image_embeddings.append(img.tolist()) 

    collection.add(
        documents=[doc["image_path"] for doc in image_documents],
        embeddings=image_embeddings,
        ids=[doc["id"] for doc in image_documents],
        metadatas=[doc["metadata"] for doc in image_documents]
    )

client = chromadb.PersistentClient(
    path='./chromadb'
)
collection = client.get_or_create_collection(name="combined_embeddings")
add_text_embeddings(collection, chunks, embeddings['text'])
add_image_embeddings(collection, image_paths, embeddings['images'])

In [None]:
class CustomizedEmbeddings:
        def __init__(self, model):
            self.model = SentenceTransformer(model, trust_remote_code=True)
        def embed_documents(self, texts):
            return [self.model.encode(t).tolist() for t in texts]   
        def embed_query(self, query: str):
            embedding = self.model.encode([query])[0].tolist()
            return embedding


embeddings=CustomizedEmbeddings('clip-ViT-B-32')

vector_store = Chroma(
    client=client,
    collection_name="combined_embeddings",
    embedding_function=embeddings
)

In [None]:
query = "What is oral hair?"
retriever = vector_store.as_retriever(search_type="mmr", search_kwargs={"k": 3})
retrieved_documents = retriever.invoke(query)
context = []
images = []
for i, doc in enumerate(retrieved_documents):
    if doc.metadata['type'] == 'text':
        context.append(doc.page_content)
        print('--------------')
        print(doc.page_content)
    else:
        print('Image retrieved')
        images.append(doc.page_content)

In [None]:
llm = ChatOpenAI(
    model_name="gpt-3.5-turbo",
    temperature=0.7,
    max_tokens=500
)

template = """Relevant information:
{context}

Provide a concise answer to the following question based on the relevant information provided above:
{question}"""

prompt = PromptTemplate(
    template=template,
    input_variables=["context", "question"]
)

formatted_prompt = prompt.format(
    context=context,
    question=query
)

response = llm(formatted_prompt)
print(response.content)

In [None]:
for path in images:
    img = Image.open(path)
    display(img)