In [None]:
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()

### Understanding Store in LangChain

In [None]:
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

file_path = r"data\toronto.pdf"

loader = PyPDFLoader(file_path=file_path)

# by default, we will split by pages with no text_splitter
documents = loader.load_and_split(text_splitter=None)
documents

### Unsing the PostgresByteStore

In [None]:
from langchain.vectorstores import Chroma
from langchain.storage import InMemoryStore
from langchain_openai import OpenAIEmbeddings
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain_postgres import PGVector
from database import COLLECTION_NAME, CONNECTION_STRING
from utils.store import PostgresByteStore
from langchain_postgres import PostgresSaver, PickleCheckpointSerializer
from utils.custom_sql_record_manager import CustomSQLRecordManager
from utils.index_with_ids import index_with_ids

embeddings = OpenAIEmbeddings()
vectorstore = PGVector(
    embeddings=embeddings,
    collection_name=COLLECTION_NAME,
    connection=CONNECTION_STRING,
    use_jsonb=True,
)

store = PostgresByteStore(CONNECTION_STRING, COLLECTION_NAME)
id_key = "doc_id"

retriever = MultiVectorRetriever(
    vectorstore=vectorstore, 
    docstore=store, 
    id_key=id_key,
)

# define record manager
namespace = f"pgvector/{COLLECTION_NAME}"
record_manager = CustomSQLRecordManager(
    namespace, db_url=CONNECTION_STRING
)
record_manager.create_schema()

retriever

In [None]:
from utils.utils import generate_reproducible_id_by_content

# Add a reproducible unique doc_id to each document's metadata
for position, doc in enumerate(documents):
    doc.metadata["doc_id"] = generate_reproducible_id_by_content(doc.page_content, doc.metadata)

In [None]:
documents

In [None]:
# Generate the list of (doc_id, document) tuples from the documents
doc_id_document_tuples = [(doc.metadata["doc_id"], doc) for doc in documents]

# Pass the list of tuples to retriever.docstore.mset
parent_docs_operations = retriever.docstore.conditional_mset(doc_id_document_tuples)

In [None]:
parent_docs_operations

### Creating Smaller Documents

In [None]:
from sqlalchemy import create_engine, Column, String, LargeBinary, select, delete, Table, MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql import cast
from langchain.schema.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Assume documents is a list of Document objects with 'doc_id' in their metadata
child_text_splitter = RecursiveCharacterTextSplitter(chunk_size=400)

# List to store all sub-documents
all_sub_docs = []

# Database connection setup
engine = create_engine(CONNECTION_STRING)
Session = sessionmaker(bind=engine)
session = Session()

# Define table structure
metadata = MetaData()
langchain_pg_embedding = Table(
    'langchain_pg_embedding', metadata,
    Column('id', String, primary_key=True),
    Column('collection_id', String),
    Column('embedding', LargeBinary),
    Column('document', String),
    Column('cmetadata', JSONB)
)

# Iterate through the operations
for doc_id, operation in parent_docs_operations:
    if operation == 'SKIP':
        # Fetch records from langchain_pg_embedding table
        query = select(
            langchain_pg_embedding.c.id,
            langchain_pg_embedding.c.collection_id,
            langchain_pg_embedding.c.embedding,
            langchain_pg_embedding.c.document,
            langchain_pg_embedding.c.cmetadata
        ).where(
            (langchain_pg_embedding.c.cmetadata['doc_id'].astext == doc_id) &
            (langchain_pg_embedding.c.cmetadata['type'].astext == 'smaller chunk')
        )
        
        result = session.execute(query).fetchall()
        
        # Recreate sub-documents from fetched records
        for row in result:
            metadata = row.cmetadata
            sub_doc_content = row.document
            sub_doc = Document(page_content=sub_doc_content, metadata=metadata)
            all_sub_docs.append(sub_doc)
    else:
        # Retrieve the document from the docstore
        doc = retriever.docstore.get(doc_id)
        if doc:
            source = doc.metadata.get("source")  # Retrieve the source from the document's metadata
            sub_docs = child_text_splitter.split_documents([doc])
            for sub_doc in sub_docs:
                sub_doc.metadata["doc_id"] = doc_id  # Assign the same doc_id to each sub-document
                sub_doc.metadata["source"] = f"{source}(smaller chunk)"  # Add the suffix to the source
                sub_doc.metadata["type"] = "smaller chunk"
            all_sub_docs.extend(sub_docs)

# Close the session after use
session.close()

# The resulting sub-documents
all_sub_docs


In [None]:
idx = index_with_ids(all_sub_docs, record_manager, vectorstore, cleanup="incremental",
                                          source_id_key="source")


In [None]:
idx

### Creating Summaries for Each Parent Chunk

In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

prompt_text = """You are an assistant tasked with summarizing text. \
Directly summarize the following text chunk: {element} """
prompt = ChatPromptTemplate.from_template(prompt_text)

# Initialize the Language Model (LLM)
model = ChatOpenAI(temperature=0, model="gpt-4o")

# Define the summary chain
summarize_chain = {"element": lambda x: x} | prompt | model | StrOutputParser()

In [None]:
from sqlalchemy import create_engine, Column, String, LargeBinary, select, Table, MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import JSONB
from langchain.schema.document import Document

# List to store all summary documents
summary_docs = []

# Database connection setup
engine = create_engine(CONNECTION_STRING)
Session = sessionmaker(bind=engine)
session = Session()

# Define table structure
metadata = MetaData()
langchain_pg_embedding = Table(
    'langchain_pg_embedding', metadata,
    Column('id', String, primary_key=True),
    Column('collection_id', String),
    Column('embedding', LargeBinary),
    Column('document', String),
    Column('cmetadata', JSONB)
)

# Collect parent chunks and associated document IDs for documents that are not SKIP
non_skip_docs = [(doc, doc_id) for doc, (doc_id, operation) in zip(documents, parent_docs_operations) if operation != 'SKIP']
parent_chunk = [doc.page_content for doc, _ in non_skip_docs]

# Generate summaries for the parent chunks that are not SKIP
text_summaries = summarize_chain.batch(parent_chunk, {"max_concurrency": 5})

# Create an iterator for the generated summaries
text_summaries_iter = iter(text_summaries)

# Iterate through the operations
for doc, (doc_id, operation) in zip(documents, parent_docs_operations):
    if operation == 'SKIP':
        # Fetch records from langchain_pg_embedding table
        query = select(
            langchain_pg_embedding.c.id,
            langchain_pg_embedding.c.collection_id,
            langchain_pg_embedding.c.embedding,
            langchain_pg_embedding.c.document,
            langchain_pg_embedding.c.cmetadata
        ).where(
            (langchain_pg_embedding.c.cmetadata['doc_id'].astext == doc_id) &
            (langchain_pg_embedding.c.cmetadata['type'].astext == 'summary')
        )
        
        result = session.execute(query).fetchall()
        
        # Recreate summary documents from fetched records
        for row in result:
            metadata = row.cmetadata
            summary_content = row.document
            summary_doc = Document(page_content=summary_content, metadata=metadata)
            summary_docs.append(summary_doc)
    else:
        # Retrieve the source and page from the document's metadata
        source = doc.metadata.get("source")
        page = doc.metadata.get("page")
        
        # Get the next generated summary
        summary_content = next(text_summaries_iter)
        
        # Create a summary document
        summary_doc = Document(page_content=summary_content, metadata={
            "doc_id": doc_id,
            "source": f"{source}(summary)",
            "page": page,
            "type": "summary"
        })
        summary_docs.append(summary_doc)

# Close the session after use
session.close()

# The resulting summary documents
summary_docs


In [None]:
idx = index_with_ids(summary_docs, record_manager, vectorstore, cleanup="incremental",
                                          source_id_key="source")


In [None]:
idx

### Generating Hypothetical Questions for Each Parent Chunk

In [None]:
functions = [
    {
        "name": "hypothetical_questions",
        "description": "Generate hypothetical questions",
        "parameters": {
            "type": "object",
            "properties": {
                "questions": {
                    "type": "array",
                    "items": {"type": "string"},
                },
            },
            "required": ["questions"],
        },
    }
]

In [None]:
from langchain.output_parsers.openai_functions import JsonKeyOutputFunctionsParser

question_chain = (
    {"doc": lambda x: x.page_content}
    # Only asking for 5 hypothetical questions, but this could be adjusted
    | ChatPromptTemplate.from_template(
        """Generate a list of exactly 5 hypothetical questions that the below document could be used to answer:\n\n{doc}
        seperate each question with a comma (,)
        """
    )
    | ChatOpenAI(max_retries=0, model="gpt-4o").bind(
        functions=functions, function_call={"name": "hypothetical_questions"}
    )
    | JsonKeyOutputFunctionsParser(key_name="questions")
)

In [None]:
from sqlalchemy import create_engine, Column, String, LargeBinary, select, Table, MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import JSONB
from langchain.schema.document import Document

# List to store all question documents
question_docs = []

# Database connection setup
engine = create_engine(CONNECTION_STRING)
Session = sessionmaker(bind=engine)
session = Session()

# Define table structure
metadata = MetaData()
langchain_pg_embedding = Table(
    'langchain_pg_embedding', metadata,
    Column('id', String, primary_key=True),
    Column('collection_id', String),
    Column('embedding', LargeBinary),
    Column('document', String),
    Column('cmetadata', JSONB)
)

# Collect parent documents and associated document IDs for documents that are not SKIP
non_skip_docs = [(doc, doc_id) for doc, (doc_id, operation) in zip(documents, parent_docs_operations) if operation != 'SKIP']
parent_documents = [doc for doc, _ in non_skip_docs]

# Generate hypothetical questions for the parent documents that are not SKIP
hypothetical_questions = question_chain.batch(parent_documents, {"max_concurrency": 5})

# Create an iterator for the generated questions
hypothetical_questions_iter = iter(hypothetical_questions)

# Iterate through the operations
for doc, (doc_id, operation) in zip(documents, parent_docs_operations):
    if operation == 'SKIP':
        # Fetch records from langchain_pg_embedding table
        query = select(
            langchain_pg_embedding.c.id,
            langchain_pg_embedding.c.collection_id,
            langchain_pg_embedding.c.embedding,
            langchain_pg_embedding.c.document,
            langchain_pg_embedding.c.cmetadata
        ).where(
            (langchain_pg_embedding.c.cmetadata['doc_id'].astext == doc_id) &
            (langchain_pg_embedding.c.cmetadata['type'].astext == 'question')
        )
        
        result = session.execute(query).fetchall()
        
        # Recreate question documents from fetched records
        for row in result:
            metadata = row.cmetadata
            question_content = row.document
            question_doc = Document(page_content=question_content, metadata=metadata)
            question_docs.append(question_doc)
    else:
        # Retrieve the source and page from the document's metadata
        source = doc.metadata.get("source")
        page = doc.metadata.get("page")
        
        # Get the list of generated questions for this document
        question_list = next(hypothetical_questions_iter)
        
        # Create a question document for each question in the list
        for question_content in question_list:
            # Define the metadata for the question document
            new_metadata = {
                "doc_id": doc_id,
                "source": f"{source}(question)",
                "page": page,
                "type": "question"
            }
            
            # Create the question document
            question_doc = Document(page_content=question_content, metadata=new_metadata)
            question_docs.append(question_doc)

# Close the session after use
session.close()

# The resulting question documents
question_docs


In [None]:
idx = index_with_ids(question_docs, record_manager, vectorstore, cleanup="incremental",
                                          source_id_key="source")



In [None]:
idx

### Creating an LCEL Chain and Testing the Retriever

In [None]:
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# Prompt template
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

# LLM
model = ChatOpenAI(temperature=0, model="gpt-4o")

# RAG pipeline
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

In [None]:
chain.invoke("What types of shops and food can I find in Kensington Market?")

In [None]:
chain.invoke("Where is Toronto's Entertainment District and what is it known for?")

In [None]:
chain.invoke("What are some cultural neighborhoods to explore in Toronto?")