## Semantic Kernel: Ramp-Up based on SK's Documentation

To get the latest version of SK and PyPDF Python packages, use:

``` bash
pip install --upgrade semantic-kernel pypdf2
```

## 📒 Notebook 5: Vector Store

This notebook uses SK's In-Memory connector to create In-Memory Vector Store with PDF Documents

### 🪜 Step 1: Configure environment

In [1]:
# Import required packages
import os
import logging
from dataclasses import dataclass, field
from typing import Annotated
from uuid import uuid4
from pathlib import Path
import traceback; 

# PDF processing
import PyPDF2

# Semantic Kernel imports
from semantic_kernel import Kernel
from semantic_kernel.contents import ChatHistory
from semantic_kernel.connectors.ai.open_ai import (
    AzureChatCompletion,
    AzureTextEmbedding,
    OpenAIEmbeddingPromptExecutionSettings,
    OpenAIChatPromptExecutionSettings
)

# Memory and vector store imports
from semantic_kernel.connectors.memory.in_memory import InMemoryVectorCollection
from semantic_kernel.data import (
    VectorSearchFilter,
    VectorSearchOptions,
    VectorStoreRecordDataField,
    VectorStoreRecordKeyField,
    VectorStoreRecordVectorField,
    vectorstoremodel,
)
from semantic_kernel.data.const import DISTANCE_FUNCTION_DIRECTION_HELPER, DistanceFunction, IndexKind
from semantic_kernel.data.vector_search import add_vector_to_records

In [2]:
# Set Azure OpenAI backend variables
AOAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_API_DEPLOY")
AOAI_ENDPOINT = os.getenv("AZURE_OPENAI_API_BASE")
AOAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION")
AOAI_EMBEDDING = os.getenv("AZURE_OPENAI_API_DEPLOY_EMBED")

# Set data folder path
DATA_FOLDER = "data"

### 🪜 Step 2: Define Data Model and Helper Functions

In [3]:
# Set constants for vector search
DISTANCE_FUNCTION = DistanceFunction.COSINE_SIMILARITY
INDEX_KIND = IndexKind.IVF_FLAT

# Class for vector store's data model
@vectorstoremodel
@dataclass
class DocumentChunk:
    """Data model for document chunks with vector embeddings."""
    vector: Annotated[
        list[float] | None,
        VectorStoreRecordVectorField(
            embedding_settings = {"embedding": OpenAIEmbeddingPromptExecutionSettings()},
            index_kind = INDEX_KIND,
            dimensions = 1536,
            distance_function = DISTANCE_FUNCTION,
            property_type = "float",
        ),
    ] = None
    id: Annotated[str, VectorStoreRecordKeyField()] = field(default_factory=lambda: str(uuid4()))
    content: Annotated[
        str,
        VectorStoreRecordDataField(
            has_embedding = True,
            embedding_property_name = "vector",
            property_type = "str",
            is_full_text_searchable = True,
        ),
    ] = "content"
    document_name: Annotated[str, VectorStoreRecordDataField(property_type="str", is_filterable=True)] = "document"
    page_number: Annotated[int, VectorStoreRecordDataField(property_type="int", is_filterable=True)] = 0
    chunk_index: Annotated[int, VectorStoreRecordDataField(property_type="int", is_filterable=True)] = 0

In [4]:
# Helper function to extract text from PDF files
def extract_text_from_pdf(pdf_path: Path) -> list[tuple[str, int]]:
    """Extract text from PDF file, returning list of (text, page_number) tuples."""
    pages_text = []
    
    try:
        with open(pdf_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            for page_num, page in enumerate(pdf_reader.pages, 1):
                text = page.extract_text()
                if text.strip():  # Only add non-empty pages
                    pages_text.append((text.strip(), page_num))
    except Exception as e:
        print(f"Error reading PDF {pdf_path}: {e}")
    
    return pages_text

# Helper function to split text into smaller pieces
def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 100) -> list[str]:
    """Split text into overlapping chunks."""
    if len(text) <= chunk_size:
        return [text]
    
    chunks = []
    start = 0
    
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        
        # Try to break at sentence boundary if possible
        if end < len(text):
            last_period = chunk.rfind('.')
            if last_period > chunk_size // 2:  # Only break if it's not too early
                chunk = chunk[:last_period + 1]
                end = start + len(chunk)
        
        chunks.append(chunk)
        start = end - overlap
        
        if end >= len(text):
            break
    
    return chunks

# Helper function to process all PDF files in a folder and return DocumentChunk objects
def process_pdfs_from_folder(folder_path: str) -> list[DocumentChunk]:
    """Process all PDF files in the specified folder and return DocumentChunk objects."""
    data_path = Path(folder_path)
    
    if not data_path.exists():
        print(f"Warning: Data folder '{folder_path}' does not exist.")
        return []
    
    document_chunks = []
    pdf_files = list(data_path.glob("*.pdf"))
    
    if not pdf_files:
        print(f"No PDF files found in '{folder_path}'.")
        return []
    
    print(f"Found {len(pdf_files)} PDF files to process:")
    for pdf_file in pdf_files:
        print(f"  - {pdf_file.name}")
    
    for pdf_file in pdf_files:
        print(f"\nProcessing: {pdf_file.name}")
        pages_text = extract_text_from_pdf(pdf_file)
        
        for page_text, page_num in pages_text:
            chunks = chunk_text(page_text)
            # print(f"  Page {page_num}: {len(chunks)} chunks")
            
            for chunk_idx, chunk in enumerate(chunks):
                document_chunk = DocumentChunk(
                    content = chunk,
                    document_name = pdf_file.stem,  # filename without extension
                    page_number = page_num,
                    chunk_index = chunk_idx
                )
                document_chunks.append(document_chunk)
    
    print(f"\nTotal chunks created: {len(document_chunks)}")
    return document_chunks

# Helper function to print search results
def print_search_result(result, score: float = None):
    """Print a search result in a formatted way."""
    print(f"Document: {result.document_name}")
    print(f"Page: {result.page_number}, Chunk: {result.chunk_index}")
    if score is not None:
        print(f"Relevance Score: {score:.4f}")
    print(f"Content: {result.content[:200]}{'...' if len(result.content) > 200 else ''}")
    print("-" * 80)

### 🪜 Step 3: Initialise Kernel and Services

In [5]:
# Initialise kernel
kernel = Kernel()

# Configure logging
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)

In [6]:
# Add Azure OpenAI embedding
if AOAI_EMBEDDING and AOAI_ENDPOINT and AOAI_API_VERSION:
    embedder = AzureTextEmbedding(
        deployment_name = AOAI_EMBEDDING,
        endpoint = AOAI_ENDPOINT,
        api_version = AOAI_API_VERSION,
        service_id = "embedding"
    )
    kernel.add_service(embedder)
    print("Azure OpenAI embedding service added")
else:
    print("Azure OpenAI embedding not configured")

Azure OpenAI embedding service added


In [7]:
# Add Azure OpenAI chat completion
if AOAI_DEPLOYMENT and AOAI_ENDPOINT and AOAI_API_VERSION:
    chat_completion = AzureChatCompletion(
        deployment_name = AOAI_DEPLOYMENT,
        endpoint = AOAI_ENDPOINT,
        api_version = AOAI_API_VERSION,
        service_id = "azure_openai_chat",
    )
    kernel.add_service(chat_completion)
    print("Azure OpenAI chat completion service added")
else:
    print("Azure OpenAI chat completion not configured")

Azure OpenAI chat completion service added


### 🪜 Step 4: Load and Process PDF Documents

In [8]:
# Process PDF documents from the data folder
print("Processing PDF documents from data folder...")
document_chunks = process_pdfs_from_folder(DATA_FOLDER)

if not document_chunks:
    print("No documents to process. Please ensure PDF files are in the 'data' folder.")
else:
    print(f"Successfully processed {len(document_chunks)} document chunks")
    
    # Display summary
    doc_summary = {}
    for chunk in document_chunks:
        if chunk.document_name not in doc_summary:
            doc_summary[chunk.document_name] = {'pages': set(), 'chunks': 0}
        doc_summary[chunk.document_name]['pages'].add(chunk.page_number)
        doc_summary[chunk.document_name]['chunks'] += 1
    
    print("\nDocument Summary:")
    for doc_name, info in doc_summary.items():
        print(f"- {doc_name}: {len(info['pages'])} pages, {info['chunks']} chunks")

Processing PDF documents from data folder...
Found 2 PDF files to process:
  - NorthwindHealthPlus_BenefitsDetails.pdf
  - Northwind_Standard_Benefits_Details.pdf

Processing: NorthwindHealthPlus_BenefitsDetails.pdf

Processing: Northwind_Standard_Benefits_Details.pdf

Total chunks created: 684
Successfully processed 684 document chunks

Document Summary:
- NorthwindHealthPlus_BenefitsDetails: 109 pages, 344 chunks
- Northwind_Standard_Benefits_Details: 104 pages, 340 chunks


### 🪜 Step 5: Create Vector Store and Upsert Documents

In [9]:
# Create and populate the vector store
async def create_and_populate_vector_store():
    if not document_chunks:
        print("No document chunks to process")
        return None
    
    print("Creating vector store collection...")
    
    # Create the collection
    record_collection = InMemoryVectorCollection[str, DocumentChunk](
        collection_name = "pdf_documents",
        data_model_type = DocumentChunk,
    )
    
    async with record_collection:
        # Create the collection after wiping it
        await record_collection.delete_collection()
        await record_collection.create_collection_if_not_exists()
        print("Collection created")
        
        # Generate embeddings and upsert records
        print("Generating embeddings for document chunks...")
        records_with_embedding = await add_vector_to_records(
            kernel, document_chunks, data_model_type=DocumentChunk
        )
        
        print("Upserting records to vector store...")
        keys = await record_collection.upsert(records_with_embedding)
        print(f"Upserted {len(keys)} records to vector store")
        
        return record_collection

# Run the async function
vector_collection = await create_and_populate_vector_store()

Creating vector store collection...
Collection created
Generating embeddings for document chunks...
Upserting records to vector store...
Upserted 684 records to vector store


### 🪜 Step 6: Search Demos

In [10]:
# Search documents in the vector store
async def search_documents(query: str, document_filter: str = None, top_k: int = 5):
    """Search for documents based on a query, prints results and returns them."""

    print(f"Searching for: '{query}'")
    if document_filter:
        print(f"Filtering by document: {document_filter}")

    retrieved_data = []

    # Set up search options
    if document_filter:
        search_filter = VectorSearchFilter.equal_to("document_name", document_filter)
        options = VectorSearchOptions(
            vector_field_name = "vector",
            include_vectors = False,
            filter = search_filter,
            top = top_k
        )
    else:
        options = VectorSearchOptions(
            vector_field_name = "vector",
            include_vectors = False,
            top = top_k
        )
    
    try:
        # Generate embedding for the query
        query_embedding = (await embedder.generate_raw_embeddings([query]))[0]
        
        # Perform the search
        async with vector_collection:
            search_results = await vector_collection.vectorized_search(
                vector = query_embedding,
                options = options,
            )
            
            results_to_process = []
            async for result_item in search_results.results:
                results_to_process.append(result_item)

            if not results_to_process:
                print("No results found")
                return []
            
            print(f"\nFound {len(results_to_process)} matching results (max to use - {top_k}):")
            print("=" * 80)
            
            for result in results_to_process:
                # print_search_result(result.record, result.score)
                retrieved_data.append((result.record, result.score))
                
    except Exception as e:
        print(f"Search error: {e}")
        import traceback
        traceback.print_exc()
        return []
        
    return retrieved_data

print("Search functions ready.")

Search functions ready.


In [11]:
# Example 1: General search
await search_documents("Tips for Employees", top_k=3)

Searching for: 'Tips for Employees'

Found 3 matching results (max to use - 3):


[(DocumentChunk(vector=None, id='22d3d1da-ddf2-4123-8560-ef69c6cb334a', content="2. Ask questions. If the employee is unsure about any part of the plan, it is important to ask \nquestions in order to make sure that the plan is suitable for their needs.  \n3. Research other plans. It is important to r esearch other plans and compare them to \nNorthwind Standard in order to determine which plan is the best option.  \n4. Verify the information. If the employee is unsure about the accuracy of any information \nthat Northwind Health provides, it is important to ve rify the information with a trusted \nsource.  \nBy following these tips, employees can make sure that they are not misled by Northwind \nHealth's intentionally false or misleading statements. It is important for employees to be \naware of any potential inaccuraci es or false information that Northwind Health may use \nwhen discussing their plans in order to make the most informed decision possible.", document_name='Northwind_Stan

In [12]:
# Example 2: Search within a specific document
await search_documents(
    query = "What is covered by the Northwind Health Standard Plan?",
    document_filter = "Northwind_Standard_Benefits_Details",
    top_k = 3
)

Searching for: 'What is covered by the Northwind Health Standard Plan?'
Filtering by document: Northwind_Standard_Benefits_Details

Found 3 matching results (max to use - 3):


[(DocumentChunk(vector=None, id='d2024f6b-392f-41e5-859a-ad367562f111', content='Summary of Benefits  \nNorthwind Standard  \nNorthwind Standard is a basic plan that provides coverage for medical, vision, a nd dental \nservices. This plan also offers coverage for preventive care services, as well as prescription \ndrug coverage. With Northwind Standard, you can choose from a variety of in -network \nproviders, including primary care physicians, specialists, hospital s, and pharmacies. This \nplan does not offer coverage for emergency services, mental health and substance abuse \ncoverage, or out -of-network services.  \nSUMMARY OF YOUR COSTS  \nSummary of Your Costs  \nWhen you choose Northwind Standard as your health plan, you can  rest assured that you \nare getting comprehensive coverage at an affordable cost. Here, we will explain the various \ncosts associated with this plan so that you know what to expect when it comes to your out -\nof-pocket expenses.  \nPremiums  \nPremiums ar

### 🪜 Step 7: RAG with PDF Documents

In [13]:
# Perform a Retrieval Augmented Generation (RAG) query
async def perform_rag_query(
    query: str,
    document_filter: str = None,
    top_k_retrieval: int = 3,
    chat_service_id: str = "azure_openai_chat" 
):
    """
    Performs Retrieval Augmented Generation (Simplified):
    1. Uses search_documents to retrieve relevant document chunks.
    2. Constructs a ChatHistory object with a system and user messages.
    3. Calls the chat service's get_chat_message_contents method.
    """

    try:
        chat_completion_service = kernel.get_service(chat_service_id)
    except Exception as e:
        print(f"Error getting chat service '{chat_service_id}' for RAG: {e}")
        return

    # --- 1. Retrieve relevant document chunks using search_documents ---
    search_results_tuples = await search_documents(
        query = query,
        document_filter = document_filter,
        top_k = top_k_retrieval
    )

    retrieved_chunks_content = []
    if not search_results_tuples:
        print("No document chunks retrieved by search_documents to use for context.")
    else:
        # print(f"\nUsing {len(search_results_tuples)} retrieved chunk(s) for context generation:")
        for record, score in search_results_tuples:
            retrieved_chunks_content.append(record.content)
            # print(f"      - RAG Context: Doc: {record.document_name}, Page: {record.page_number}, Chunk: {record.chunk_index}, Score: {score:.4f} (Content snippet: '{record.content[:50].strip()}...')")

    # --- 2. Construct ChatHistory ---
    chat_history = ChatHistory()
    system_message = (
        "You are a helpful AI assistant. You are provided with context from documents. "
        "Your task is to answer the user's question based ONLY on this provided context. "
        "Do not use any external knowledge or make assumptions beyond what is stated in the context. "
        "If the information to answer the question is not present in the context, "
        "clearly state that you cannot answer based on the provided information. "
        "Be concise and directly answer the question."
    )
    chat_history.add_system_message(system_message)

    if retrieved_chunks_content:
        context_str = "\n\n---\n\n".join(retrieved_chunks_content)
        user_message_text = f"""Here is the context from the documents:
<context>
{context_str}
</context>

Based ONLY on the context provided above, please answer the following question.
User Question: {query}"""
        # print(f"\nContext prepared for LLM (using {len(retrieved_chunks_content)} chunk(s)).")
    else:
        user_message_text = f"""User Question: {query}

(System note: No specific context was retrieved for this question from the documents. 
Based on your instructions, if the answer is not in the documents, please state that.)"""

    chat_history.add_user_message(user_message_text)

    # --- 3. Generate an answer using the chat service's get_chat_message_contents ---
    try:
        execution_settings = OpenAIChatPromptExecutionSettings(
            service_id = chat_service_id,
        )
        
        response_messages = await chat_completion_service.get_chat_message_contents(
            chat_history = chat_history,
            settings = execution_settings
        )

        print("\nLLM Response:")
        if response_messages and isinstance(response_messages, list) and len(response_messages) > 0:
            assistant_message_content = response_messages[0]
            if hasattr(assistant_message_content, 'content') and assistant_message_content.content is not None:
                 print(str(assistant_message_content.content))
            elif hasattr(assistant_message_content, 'items') and assistant_message_content.items and \
                 hasattr(assistant_message_content.items[0], 'text'):
                 print(assistant_message_content.items[0].text)
            else: 
                 print(str(assistant_message_content))
        else:
            print("\nLLM returned an empty or unexpected response format.")
            print(f"Raw response: {response_messages}")

    except AttributeError as e:
        print(f"AttributeError during LLM invocation: {e}")
        traceback.print_exc()
    except Exception as e:
        print(f"Error during LLM invocation: {e}")
        traceback.print_exc()

print("RAG function is ready.")

RAG function is ready.


In [14]:
# General RAG query across all documents
await perform_rag_query(
    query="What are the key benefits of the Northwind Health Plus plan?",
    top_k_retrieval=3
)

print("\n" + "="*80 + "\n")

Searching for: 'What are the key benefits of the Northwind Health Plus plan?'

Found 3 matching results (max to use - 3):

LLM Response:
The key benefits of the Northwind Health Plus plan include comprehensive coverage for medical, vision, and dental services, prescription drug coverage, mental health and substance abuse coverage, and preventive care services. The plan allows you to choose from a variety of in-network providers, including primary care physicians, specialists, hospitals, and pharmacies. It also offers coverage for emergency services both in-network and out-of-network.




### 🪜 Step 8: Cleanup (Optional)

In [15]:
# Clean up the vector collection
async def cleanup_collection():
    if vector_collection:
        try:
            async with vector_collection:
                await vector_collection.delete_collection()
            print("Vector collection cleaned up")
        except Exception as e:
            print(f"Cleanup error: {e}")
    else:
        print("No collection to clean up")

await cleanup_collection()

Vector collection cleaned up
