In [1]:
import os
import io
import numpy as np
import faiss
import torch

from PIL import Image
from pdf2image import convert_from_path
from PyPDF2 import PdfReader

import openai
from transformers import CLIPProcessor, CLIPModel

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
openai.api_key = os.environ.get("OPENAI_API_KEY")

device = "cuda" if torch.cuda.is_available() else "cpu"
model_id = "openai/clip-vit-base-patch32"  # can be changed to another CLIP model
clip_model = CLIPModel.from_pretrained(model_id).to(device)
clip_processor = CLIPProcessor.from_pretrained(model_id)

# Create a cache dictionary in memory
response_cache = {}

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


In [3]:
def embed_texts(texts, processor, model):
    """
    Given a list of text strings, return their CLIP embeddings as a NumPy array.
    """
    inputs = processor(
        text=texts,
        padding=True,
        truncation=True,
        return_tensors="pt"
    ).to(device)
    
    with torch.no_grad():
        text_embeddings = model.get_text_features(**inputs)
    
    # Normalize embeddings
    text_embeddings = text_embeddings / text_embeddings.norm(dim=-1, keepdim=True)
    return text_embeddings.cpu().numpy()

def embed_images(images, processor, model):
    """
    Given a list of PIL images, return their CLIP embeddings as a NumPy array.
    """
    inputs = processor(
        images=images,
        return_tensors="pt"
    ).to(device)
    
    with torch.no_grad():
        image_embeddings = model.get_image_features(**inputs)
        
    # Normalize embeddings
    image_embeddings = image_embeddings / image_embeddings.norm(dim=-1, keepdim=True)
    return image_embeddings.cpu().numpy()

def get_faiss_index(dimension):
    """
    Create and return a flat Faiss index of the specified dimension.
    """
    index = faiss.IndexFlatIP(dimension)  # Using Inner Product (cosine similarity) 
    return index

def add_to_index(index, embeddings):
    """
    Add embeddings to a Faiss index.
    """
    index.add(embeddings)

def search_index(index, query_embedding, top_k=5):
    """
    Search the Faiss index for the top_k nearest neighbors to query_embedding.
    Returns (distances, indices).
    """
    distances, indices = index.search(query_embedding, top_k)
    return distances, indices

def retrieve_context(indices, metadata):
    """
    Given a list of indices from Faiss, return the corresponding metadata (text snippets, image descriptions, etc.).
    """
    retrieved = []
    for idx in indices[0]:
        retrieved.append(metadata[idx])
    return retrieved

def call_gpt_4(prompt):
    """
    Calls GPT-4 with a prompt and returns the response.
    Caching is used so repeated prompts are not re-sent to the API.
    """
    if prompt in response_cache:
        return response_cache[prompt]
    
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=200,
        temperature=0.7
    )
    answer = response['choices'][0]['message']['content']
    
    # Store in cache
    response_cache[prompt] = answer
    return answer

In [None]:
PDF_FILE = "knowledge/subset_monetary_policy_report.pdf"

text_data = []
image_data = []

# Extract text
reader = PdfReader(PDF_FILE)
num_pages = len(reader.pages)

for page_i in range(num_pages):
    page = reader.pages[page_i]
    page_text = page.extract_text()
    
    # You might want to chunk the text if it's very long. Here we store the entire page text as one chunk.
    if page_text and page_text.strip():
        text_data.append({
            "text": page_text.strip(),
            "page_number": page_i + 1
        })

# Extract images
# pdf2image.convert_from_path converts each PDF page into a PIL image
pages_as_images = convert_from_path(PDF_FILE, dpi=200, poppler_path=r'C:\Program Files\poppler-0.68.0\bin')  # You can adjust the dpi
for i, pil_img in enumerate(pages_as_images):
    image_data.append({
        "image": pil_img,
        "page_number": i + 1
    })

PDFInfoNotInstalledError: Unable to get page count. Is poppler installed and in PATH?

In [None]:
all_metadata = []
all_embeddings = []

# 5A. Embed all text chunks
texts_list = [td["text"] for td in text_data]
if len(texts_list) > 0:
    text_embeddings = embed_texts(texts_list, clip_processor, clip_model)
    for i, emb in enumerate(text_embeddings):
        all_metadata.append({
            "type": "text",
            "content": text_data[i]["text"],
            "page_number": text_data[i]["page_number"]
        })
        all_embeddings.append(emb)

# 5B. Embed all images
pil_images_list = [id_["image"] for id_ in image_data]
if len(pil_images_list) > 0:
    image_embeddings = embed_images(pil_images_list, clip_processor, clip_model)
    for i, emb in enumerate(image_embeddings):
        all_metadata.append({
            "type": "image",
            "content": f"Image from page {image_data[i]['page_number']}",  # or store the actual PIL object if needed
            "page_number": image_data[i]["page_number"]
        })
        all_embeddings.append(emb)

# Convert to NumPy array
all_embeddings = np.array(all_embeddings).astype('float32')
embedding_dimension = all_embeddings.shape[1]

In [None]:
index = get_faiss_index(embedding_dimension)
add_to_index(index, all_embeddings)

In [None]:
def answer_query(user_query, top_k=3):
    """
    1. Embed the user query (assuming it's text).
    2. Retrieve top_k similar items from the PDF (text or image).
    3. Create a prompt for GPT-4 with the retrieved context.
    4. Return GPT-4's answer.
    """
    
    # Step 1: Embed user query as text
    query_emb = embed_texts([user_query], clip_processor, clip_model)  # shape: (1, D)
    
    # Step 2: Retrieve from Faiss
    distances, indices = search_index(index, query_emb, top_k=top_k)
    retrieved_items = retrieve_context(indices, all_metadata)
    
    # Build a context string. You may want more sophisticated formatting.
    context_str_list = []
    for item in retrieved_items:
        context_str_list.append(
            f"({item['type']}, page {item['page_number']}): {item['content'][:500]}..."  # truncate if needed
        )
    context_str = "\n".join(context_str_list)
    
    # Step 3: Create a prompt for GPT-4
    prompt = f"""
The user asked: "{user_query}"

I have the following context from the PDF:
{context_str}

Based on the context above (and only this context if possible), answer the query:
"""
    
    # Step 4: Call GPT-4
    answer = call_gpt_4(prompt)
    return answer

In [None]:
user_query_1 = "Give me a summary of the content in the PDF."
response_1 = answer_query(user_query_1, top_k=3)
print("Q:", user_query_1)
print("A:", response_1)

# Try the same query again and see if caching returns the same answer instantly.
response_2 = answer_query(user_query_1, top_k=3)
print("Q:", user_query_1, "(second time)")
print("A:", response_2)

# Another example query
user_query_2 = "Describe the images found on page 2."
response_3 = answer_query(user_query_2, top_k=3)
print("\nQ:", user_query_2)
print("A:", response_3)