In [None]:
print("Hi")

: 

In [5]:
import os
from dotenv import load_dotenv
load_dotenv()

from langchain_openai import ChatOpenAI
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo")

response = llm.invoke("Hi how are you")
print(response.content)

Hello! I'm just a computer program, so I don't have feelings, but I'm here to help you. How can I assist you today?


In [None]:
# rag.py

import os
import io
import base64
import numpy as np
from PIL import Image
import torch
from dotenv import load_dotenv

from transformers import CLIPProcessor, CLIPModel
from pydantic import BaseModel
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain.chat_models import init_chat_model
from langchain.schema.messages import HumanMessage

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver


# -----------------------------
# Environment Setup
# -----------------------------
load_dotenv()
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")


# -----------------------------
# CLIP Model Initialization
# -----------------------------
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clip_model.eval()

BaseModel.model_config = {"arbitrary_types_allowed": True}


def embed_image(image_data):
    """Embed image using CLIP"""
    if isinstance(image_data, str):
        image = Image.open(image_data).convert("RGB")
    else:
        image = image_data
    inputs = clip_processor(images=image, return_tensors="pt")
    with torch.no_grad():
        features = clip_model.get_image_features(**inputs)
        features = features / features.norm(dim=-1, keepdim=True)
        return features.squeeze().numpy()


def embed_text(text):
    """Embed text using CLIP"""
    inputs = clip_processor(
        text=text,
        return_tensors="pt",
        padding=True,
        truncation=True,
        max_length=77
    )
    with torch.no_grad():
        features = clip_model.get_text_features(**inputs)
        features = features / features.norm(dim=-1, keepdim=True)
        return features.squeeze().numpy()


# -----------------------------
# Multimodal RAG Pipeline Class
# -----------------------------
class MultimodalRAG:
    def __init__(self):
        self.docs = []
        self.embeddings = []
        self.image_data_store = {}
        self.vector_store = None
        self.llm = init_chat_model("openai:gpt-4.1")
        self.checkpointer = MemorySaver()
        self.graph = self._create_graph()

    def ingest_documents(self, docs):
        """Ingest a list of Document objects (text or image) into the RAG system."""
        splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)

        for doc in docs:
            if doc.metadata.get("type") == "image":
                pil_image = doc.metadata["image"]
                image_id = doc.metadata["image_id"]

                buffered = io.BytesIO()
                pil_image.save(buffered, format="PNG")
                img_base64 = base64.b64encode(buffered.getvalue()).decode()
                self.image_data_store[image_id] = img_base64

                emb = embed_image(pil_image)
                self.embeddings.append(emb)
                self.docs.append(doc)

            else:  # text doc
                text_chunks = splitter.split_documents([doc])
                for chunk in text_chunks:
                    emb = embed_text(chunk.page_content)
                    self.embeddings.append(emb)
                    self.docs.append(chunk)

        # rebuild vector store
        self.vector_store = FAISS.from_embeddings(
            text_embeddings=[(doc.page_content, emb) for doc, emb in zip(self.docs, self.embeddings)],
            embedding=None,
            metadatas=[doc.metadata for doc in self.docs],
        )

    def _create_graph(self):
        """Define a simple retrieval → generation graph."""
        workflow = StateGraph(dict)

        def retrieve(state):
            query = state["query"]
            query_emb = embed_text(query)
            results = self.vector_store.similarity_search_by_vector(query_emb, k=5)
            return {"query": query, "context": results}

        def generate(state):
            query, retrieved_docs = state["query"], state["context"]
            content = [{"type": "text", "text": f"Question: {query}\n\nContext:\n"}]

            # Text docs
            text_docs = [doc for doc in retrieved_docs if doc.metadata.get("type") == "text"]
            if text_docs:
                text_context = "\n\n".join(
                    [f"[Source: {doc.metadata.get('source','')}] {doc.page_content}" for doc in text_docs]
                )
                content.append({"type": "text", "text": f"Text excerpts:\n{text_context}\n"})

            # Image docs
            for doc in retrieved_docs:
                if doc.metadata.get("type") == "image":
                    image_id = doc.metadata.get("image_id")
                    if image_id and image_id in self.image_data_store:
                        content.append(
                            {"type": "text", "text": f"\n[Image from {doc.metadata.get('source','')}]:\n"}
                        )
                        content.append(
                            {
                                "type": "image_url",
                                "image_url": {"url": f"data:image/png;base64,{self.image_data_store[image_id]}"},
                            }
                        )

            content.append(
                {"type": "text", "text": "\n\nPlease answer the question based on the provided text and images."}
            )

            message = HumanMessage(content=content)
            response = self.llm.invoke([message])
            return {"answer": response.content}

        workflow.add_node("retrieve", retrieve)
        workflow.add_node("generate", generate)
        workflow.set_entry_point("retrieve")
        workflow.add_edge("retrieve", "generate")
        workflow.add_edge("generate", END)

        return workflow.compile(checkpointer=self.checkpointer)

    def query(self, query: str) -> str:
        """Run query through RAG pipeline with thread_id for memory."""
        if not self.vector_store:
            return "⚠️ No documents ingested yet. Please upload PDFs or images."
        state = {"query": query}
        result = self.graph.invoke(state, config={"configurable": {"thread_id": "streamlit-session"}})
        return result.get("answer", "No answer found.")


In [None]:
# app.py
import streamlit as st
from PyPDF2 import PdfReader
from PIL import Image
from langchain_core.documents import Document
from rag import MultimodalRAG   # corrected import

st.set_page_config(page_title="📚 Multimodal RAG", layout="wide")

# Initialize RAG system
if "rag" not in st.session_state:
    st.session_state["rag"] = MultimodalRAG()
rag = st.session_state["rag"]

# -------------------------------
# Sidebar for ingestion
# -------------------------------
st.sidebar.title("📂 Data Ingestion")

st.sidebar.subheader("Upload PDFs")
uploaded_pdfs = st.sidebar.file_uploader(
    "Choose PDF files", type=["pdf"], accept_multiple_files=True
)

st.sidebar.subheader("Upload Images")
uploaded_images = st.sidebar.file_uploader(
    "Choose image files", type=["png", "jpg", "jpeg"], accept_multiple_files=True
)

# Ingest PDFs
if uploaded_pdfs:
    docs = []
    with st.spinner("📖 Processing PDFs..."):
        for uploaded_file in uploaded_pdfs:
            pdf_reader = PdfReader(uploaded_file)
            for page_num, page in enumerate(pdf_reader.pages):
                text = page.extract_text()
                if text:
                    docs.append(
                        Document(
                            page_content=text,
                            metadata={
                                "source": uploaded_file.name,
                                "page": page_num,
                                "type": "text"
                            },
                        )
                    )
        if docs:
            rag.ingest_documents(docs)
            st.sidebar.success(f"✅ Ingested {len(docs)} text chunks from {len(uploaded_pdfs)} PDFs.")

# Ingest Images
if uploaded_images:
    image_docs = []
    with st.spinner("🖼️ Processing images..."):
        for idx, uploaded_file in enumerate(uploaded_images):
            try:
                pil_image = Image.open(uploaded_file).convert("RGB")
                image_docs.append(
                    Document(
                        page_content=f"[Image: {uploaded_file.name}]",
                        metadata={
                            "source": uploaded_file.name,
                            "type": "image",
                            "image_id": f"user_image_{idx}",
                            "image": pil_image,
                        },
                    )
                )
            except Exception as e:
                st.sidebar.error(f"Error loading {uploaded_file.name}: {e}")

        if image_docs:
            rag.ingest_documents(image_docs)
            st.sidebar.success(f"✅ Ingested {len(image_docs)} images.")

# -------------------------------
# Main Column for Query
# -------------------------------
st.title("🤖 Multimodal RAG Assistant")

query = st.text_input("Ask a question about the uploaded documents & images:")

if query:
    with st.spinner("💡 Thinking..."):
        answer = rag.query(query)
    st.markdown("### 📌 Answer")
    st.write(answer)


In [None]:
# rag.py
import os
import io
import numpy as np
import torch
from PIL import Image
from dotenv import load_dotenv

from transformers import CLIPProcessor, CLIPModel
from pydantic import BaseModel
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chat_models import init_chat_model
from langchain.schema.messages import HumanMessage

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

# Try to import faiss for fast similarity search; fallback to numpy
try:
    import faiss  # type: ignore
    _FAISS_AVAILABLE = True
except Exception:
    _FAISS_AVAILABLE = False

# -----------------------------
# Environment Setup
# -----------------------------
load_dotenv()
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "")

# -----------------------------
# CLIP Model Initialization
# -----------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clip_model.eval()

# allow arbitrary types in pydantic models used by some libs
BaseModel.model_config = {"arbitrary_types_allowed": True}

# -----------------------------
# Embedding Functions
# -----------------------------
@torch.no_grad()
def embed_text(text: str) -> np.ndarray:
    """Return L2-normalized numpy vector for input text."""
    if text is None:
        text = ""
    inputs = clip_processor(text=[text], return_tensors="pt", padding=True, truncation=True, max_length=77)
    # move to device
    for k, v in inputs.items():
        inputs[k] = v.to(device)
    features = clip_model.get_text_features(**inputs)  # (1, dim)
    features = features / features.norm(dim=-1, keepdim=True)
    vec = features.cpu().numpy()[0].astype(np.float32)
    return vec


@torch.no_grad()
def embed_image(image: Image.Image) -> np.ndarray:
    """Return L2-normalized numpy vector for input PIL image."""
    inputs = clip_processor(images=[image], return_tensors="pt")
    for k, v in inputs.items():
        inputs[k] = v.to(device)
    features = clip_model.get_image_features(**inputs)  # (1, dim)
    features = features / features.norm(dim=-1, keepdim=True)
    vec = features.cpu().numpy()[0].astype(np.float32)
    return vec


# -----------------------------
# Simple Vector Store (supports faiss if available)
# -----------------------------
class VectorStore:
    def __init__(self):
        self.embeddings = None  # numpy array shape (N, D)
        self.metadatas = []     # list of dicts (length N)
        self.documents = []     # list of Document (length N)
        self._faiss_index = None
        self.dimension = None

    def add(self, emb: np.ndarray, doc: Document):
        """Add a single embedding + Document."""
        emb = np.asarray(emb, dtype=np.float32)
        if emb.ndim == 1:
            emb = emb.reshape(1, -1)
        if self.embeddings is None:
            self.embeddings = emb
            self.dimension = emb.shape[1]
        else:
            # ensure same dim
            if emb.shape[1] != self.dimension:
                raise ValueError("Embedding dimension mismatch.")
            self.embeddings = np.vstack([self.embeddings, emb])
        self.documents.append(doc)
        self.metadatas.append(doc.metadata or {})

    def build_index(self):
        """Build faiss index if available; otherwise keep embeddings for brute-force search."""
        if self.embeddings is None:
            return
        if _FAISS_AVAILABLE:
            # Use inner product (embeddings are normalized => cosine similarity)
            self._faiss_index = faiss.IndexFlatIP(self.dimension)
            self._faiss_index.add(self.embeddings)
        else:
            self._faiss_index = None

    def similarity_search_by_vector(self, query_emb: np.ndarray, k: int = 5):
        """
        Return top-k Document objects sorted by similarity (highest first).
        If fewer than k items exist, return all.
        """
        if self.embeddings is None:
            return []

        q = np.asarray(query_emb, dtype=np.float32)
        if q.ndim == 1:
            q = q.reshape(1, -1)

        if self._faiss_index is not None:
            # faiss returns (distances, indices)
            D, I = self._faiss_index.search(q, min(k, self.embeddings.shape[0]))
            indices = I[0].tolist()
            # distances are inner product scores
            results = [self.documents[i] for i in indices if i != -1]
            return results
        else:
            # brute-force cosine (since embeddings are normalized, cosine == dot)
            sims = (self.embeddings @ q.T).squeeze(axis=1)  # (N,)
            topk_idx = np.argsort(-sims)[:k]
            results = [self.documents[i] for i in topk_idx]
            return results


# -----------------------------
# Multimodal RAG Pipeline
# -----------------------------
class MultimodalRAG:
    def __init__(self):
        # keep text docs and image docs as Documents
        self.vector_store = VectorStore()
        # keep actual PIL images in memory keyed by source name for display
        self.image_store = {}  # {source_name: PIL.Image}
        # LLM
        self.llm = init_chat_model("openai:gpt-4.1")
        # LangGraph checkpointer
        self.checkpointer = MemorySaver()
        # compiled graph
        self.graph = self._create_graph()
        # text splitter for PDFs/long text
        self.splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)

    # -------------------------
    # Ingest Text Documents (PDF FAQ pages -> Document objects)
    # -------------------------
    def ingest_texts(self, docs):
        """
        docs: list of langchain_core.documents.Document with page_content and metadata.
        This method splits long text into chunks, computes embeddings and adds to vector store.
        """
        for doc in docs:
            # split into chunks
            chunks = self.splitter.split_documents([doc])
            for chunk in chunks:
                text = chunk.page_content or ""
                emb = embed_text(text)
                # ensure chunk has metadata and a source
                md = chunk.metadata or {}
                md = dict(md)  # copy
                md.setdefault("type", "text")
                chunk.metadata = md
                self.vector_store.add(emb, Document(page_content=text, metadata=md))

        # rebuild index after ingestion
        self.vector_store.build_index()

    # -------------------------
    # Ingest Images
    # -------------------------
    def ingest_images(self, images: list):
        """
        images: list of dicts like {"image": PIL.Image, "name": "file.jpg"}
        Stores actual image objects in image_store and creates Document entries with placeholder text.
        """
        for img_data in images:
            img = img_data["image"]
            name = img_data.get("name", "image")
            # compute embedding
            emb = embed_image(img)
            # store image in memory for later display
            self.image_store[name] = img.copy()
            # create a document placeholder with metadata indicating image
            md = {"source": name, "type": "image"}
            doc = Document(page_content=f"Image: {name}", metadata=md)
            self.vector_store.add(emb, doc)

        # rebuild index after ingestion
        self.vector_store.build_index()

    # -------------------------
    # Graph: Retrieval → Generation
    # -------------------------
    def _create_graph(self):
        workflow = StateGraph(dict)

        def retrieve(state):
            query_text = state.get("query_text", "")
            if not query_text:
                return {"query_text": query_text, "context": []}

            # embed the query using text encoder
            query_emb = embed_text(query_text)
            retrieved_docs = self.vector_store.similarity_search_by_vector(query_emb, k=8)  # fetch a few
            return {"query_text": query_text, "context": retrieved_docs}

        def generate(state):
            query_text = state.get("query_text", "")
            retrieved_docs = state.get("context", []) or []

            # separate text and image docs
            text_docs = [d for d in retrieved_docs if (d.metadata or {}).get("type") == "text"]
            image_docs = [d for d in retrieved_docs if (d.metadata or {}).get("type") == "image"]

            # prepare prompt/context for the LLM
            context_blocks = []
            if text_docs:
                for d in text_docs:
                    src = (d.metadata or {}).get("source", "")
                    context_blocks.append(f"[Source: {src}] {d.page_content}")

            # build message content as the LLM wrapper expects
            prompt_parts = [
                {"type": "text", "text": f"Question: {query_text}\n\n"},
            ]
            if context_blocks:
                prompt_parts.append({"type": "text", "text": "Context from FAQs:\n" + "\n\n".join(context_blocks) + "\n\n"})
            prompt_parts.append({"type": "text", "text": "Please answer concisely and reference the sources (if any). If images are relevant, mention which image filenames are relevant."})

            message = HumanMessage(content=prompt_parts)

            # invoke LLM
            try:
                response = self.llm.invoke([message])
            except Exception as e:
                # graceful failure: return error as text
                return {"answer": {"text": f"LLM invocation failed: {e}", "images": []}}

            # robustly extract text from response
            answer_text = ""
            if isinstance(response, str):
                answer_text = response
            else:
                # many LLM wrappers put the textual content in `.content`
                answer_text = getattr(response, "content", None)
                if answer_text is None:
                    # fallback to string conversion
                    answer_text = str(response)

            # Prepare image objects to return (actual PIL images from image_store)
            image_list = []
            for img_doc in image_docs:
                src = (img_doc.metadata or {}).get("source")
                if src and src in self.image_store:
                    image_list.append({"name": src, "image": self.image_store[src]})

            return {"answer": {"text": answer_text, "images": image_list}}

        workflow.add_node("retrieve", retrieve)
        workflow.add_node("generate", generate)
        workflow.set_entry_point("retrieve")
        workflow.add_edge("retrieve", "generate")
        workflow.add_edge("generate", END)

        return workflow.compile(checkpointer=self.checkpointer)

    # -------------------------
    # Public Query
    # -------------------------
    def query(self, query_text: str):
        if not query_text or not query_text.strip():
            return {"text": "Please provide a query.", "images": []}

        state = {"query_text": query_text}
        result = self.graph.invoke(state, config={"configurable": {"thread_id": "streamlit-session"}})
        # result is expected to be a dict from generate node
        ans = result.get("answer") if isinstance(result, dict) else result
        if not ans:
            return {"text": "No answer found.", "images": []}
        text = ans.get("text", "")
        images = ans.get("images", [])
        return {"text": text, "images": images}


In [None]:
# app.py
import streamlit as st
from PyPDF2 import PdfReader
from PIL import Image
from io import BytesIO
from langchain_core.documents import Document
from rag import MultimodalRAG

st.set_page_config(page_title="E-commerce Product Assistant", layout="wide")

st.title("🛒 E-commerce Product FAQ & Image Assistant")

# Initialize or reuse rag in session state so we don't lose data
if "rag" not in st.session_state:
    st.session_state["rag"] = MultimodalRAG()
rag: MultimodalRAG = st.session_state["rag"]

# -----------------------------
# Sidebar: Document & Image Ingestion
# -----------------------------
st.sidebar.header("📂 Upload FAQs and Product Images")

pdf_files = st.sidebar.file_uploader("Upload FAQ PDFs", type=["pdf"], accept_multiple_files=True)
image_files = st.sidebar.file_uploader("Upload Product Images", type=["png", "jpg", "jpeg"], accept_multiple_files=True)

process_button = st.sidebar.button("Process / Add to Index")

if process_button:
    # Process PDFs (text)
    if pdf_files:
        docs = []
        with st.spinner("📖 Processing PDFs..."):
            for uploaded_file in pdf_files:
                try:
                    pdf_reader = PdfReader(uploaded_file)
                    for page_num, page in enumerate(pdf_reader.pages):
                        text = page.extract_text()
                        if text and text.strip():
                            docs.append(
                                Document(
                                    page_content=text,
                                    metadata={"source": uploaded_file.name, "page": page_num, "type": "text"},
                                )
                            )
                except Exception as e:
                    st.sidebar.error(f"Failed to read {uploaded_file.name}: {e}")

        if docs:
            rag.ingest_texts(docs)
            st.sidebar.success(f"✅ Ingested {len(docs)} text chunks from {len(pdf_files)} PDFs.")
        else:
            st.sidebar.info("No text chunks found in uploaded PDFs.")

    # Process Images
    if image_files:
        images = []
        with st.spinner("🖼️ Processing Images..."):
            for uploaded_file in image_files:
                try:
                    # Read bytes and make PIL image
                    img = Image.open(uploaded_file).convert("RGB")
                    images.append({"image": img, "name": uploaded_file.name})
                except Exception as e:
                    st.sidebar.error(f"Failed to read image {uploaded_file.name}: {e}")

        if images:
            rag.ingest_images(images)
            st.sidebar.success(f"✅ Ingested {len(images)} images.")
        else:
            st.sidebar.info("No images ingested.")

# -----------------------------
# Index Info (optional)
# -----------------------------
st.sidebar.markdown("---")
st.sidebar.write("Index info:")
num_items = 0
if rag.vector_store.embeddings is not None:
    num_items = rag.vector_store.embeddings.shape[0]
st.sidebar.write(f"Items indexed: **{num_items}**")
st.sidebar.write(f"Images stored: **{len(rag.image_store)}**")

# -----------------------------
# Main: Query
# -----------------------------
st.subheader("💬 Ask a Question about Products")

query_text = st.text_area("Enter your question:", height=150)

if st.button("🔎 Search"):
    if rag.vector_store.embeddings is None:
        st.error("⚠️ Please process FAQs and images first from the sidebar (click 'Process / Add to Index').")
    elif not query_text or not query_text.strip():
        st.warning("Please enter a query.")
    else:
        with st.spinner("💡 Thinking..."):
            answer = rag.query(query_text)

        st.markdown("### 📑 Answer")
        st.write(answer["text"])

        if answer.get("images"):
            st.markdown("### 🖼️ Relevant Images")
            cols = st.columns(3)
            for i, img_obj in enumerate(answer["images"]):
                # img_obj is {"name": name, "image": PIL.Image}
                name = img_obj.get("name", f"image_{i}")
                img = img_obj.get("image")
                if img is not None:
                    col = cols[i % 3]
                    col.image(img, caption=name, use_column_width=True)
