In [2]:
import os
import sys
from dotenv import load_dotenv
import PyPDF2
import pandas as pd
import numpy as np
# LangChain imports
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain_community.vectorstores import FAISS
from langchain.prompts import PromptTemplate
from langchain_groq import ChatGroq
from langchain.schema import Document
from IPython.display import Markdown, display
# Load environment variables
load_dotenv()
# Set up Groq API key
os.environ["GROQ_API_KEY"] = ""

In [3]:
# Cell 2: Define PDF Extraction Function
def extract_text_from_pdf(pdf_path):
    """Extract text from a PDF file"""
    with open(pdf_path, 'rb') as file:
        pdf_reader = PyPDF2.PdfReader(file)
        text = ""
        for page_num in range(len(pdf_reader.pages)):
            page = pdf_reader.pages[page_num]
            text += page.extract_text()
    return text

In [4]:
# Cell 3: Extract Text from PDF Files
# Define the path to the lecture notes
lecture_notes_directory = "../data/lecture_notes/"
pdf_files = [
    os.path.join(lecture_notes_directory, "L1_Part1_Intro to DevOps and Beyond.pdf"),
    os.path.join(lecture_notes_directory, "L1_Part2_Containers 101 (1).pdf"),
    os.path.join(lecture_notes_directory, "L2_Lecture 2 - Part 1.pdf"),
    os.path.join(lecture_notes_directory, "L2_Lecture 2 - Part 2.pdf"),
    os.path.join(lecture_notes_directory, "L3_Part1_AWS User Groups Colombo - Introduction to AWS Cloud Platform (1).pdf"),
    os.path.join(lecture_notes_directory, "L3_Part2_Cloud Computing 101 (1).pdf"),
    os.path.join(lecture_notes_directory, "L4_Part1_CAP Theorem.pdf"),
    os.path.join(lecture_notes_directory, "L4_Part2_Key Essentials for Building Application in Cloud.pdf"),
    os.path.join(lecture_notes_directory, "Introduction to Microservices (1).pdf"),
    os.path.join(lecture_notes_directory, "Microservice Design Patterns.pdf"),
    os.path.join(lecture_notes_directory, "Cloud Design Patterns - 1.pdf"),
    os.path.join(lecture_notes_directory, "Cloud Design Patterns - 2.pdf"),
    os.path.join(lecture_notes_directory, "Lecture 01-Introduction to AI ML - Updated(2025).pdf"),
    os.path.join(lecture_notes_directory, "ML Lec 2 - Part 1.pdf"),
    os.path.join(lecture_notes_directory, "ML Lec 2 - Part 2 LLM.pdf")
]
# Extract text from all PDF files
lecture_texts = []
for pdf_file in pdf_files:
    try:
        text = extract_text_from_pdf(pdf_file)
        lecture_texts.append({"source": os.path.basename(pdf_file), "text": text})
        print(f"Successfully extracted text from {pdf_file}")
    except Exception as e:
        print(f"Error extracting text from {pdf_file}: {e}")
# Create document objects for each lecture with source metadata
documents = []
for lecture in lecture_texts:
    documents.append(
        Document(
            page_content=lecture["text"],
            metadata={"source": lecture["source"]}
        )
    )

Successfully extracted text from ../data/lecture_notes/L1_Part1_Intro to DevOps and Beyond.pdf
Successfully extracted text from ../data/lecture_notes/L1_Part2_Containers 101 (1).pdf
Successfully extracted text from ../data/lecture_notes/L2_Lecture 2 - Part 1.pdf
Successfully extracted text from ../data/lecture_notes/L2_Lecture 2 - Part 2.pdf
Successfully extracted text from ../data/lecture_notes/L3_Part1_AWS User Groups Colombo - Introduction to AWS Cloud Platform (1).pdf
Successfully extracted text from ../data/lecture_notes/L3_Part2_Cloud Computing 101 (1).pdf
Successfully extracted text from ../data/lecture_notes/L4_Part1_CAP Theorem.pdf
Successfully extracted text from ../data/lecture_notes/L4_Part2_Key Essentials for Building Application in Cloud.pdf
Successfully extracted text from ../data/lecture_notes/Introduction to Microservices (1).pdf
Successfully extracted text from ../data/lecture_notes/Microservice Design Patterns.pdf
Successfully extracted text from ../data/lecture_note

In [5]:
# Cell 4: Split Documents into Chunks
# Initialize the text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len,
)
# Split documents into chunks
chunks = text_splitter.split_documents(documents)
print(f"Split text into {len(chunks)} chunks")

Split text into 153 chunks


In [6]:
# Cell 5: Create Vector Store
# Create a simple embedding function without external dependencies
print("Creating custom embeddings...")
class SimpleEmbeddings:
    def __init__(self):
        self.vocab = {}
        self.fitted = False
        
    def _tokenize(self, text):
        # Simple tokenization by splitting on whitespace and lowercase
        return [word.lower() for word in text.split() if word]
        
    def _create_vocab(self, texts):
        vocab = {}
        idx = 0
        for text in texts:
            for token in self._tokenize(text):
                if token not in vocab:
                    vocab[token] = idx
                    idx += 1
        return vocab
        
    def embed_documents(self, texts):
        if not self.fitted:
            self.vocab = self._create_vocab(texts)
            self.fitted = True
        
        vectors = []
        for text in texts:
            # Create a one-hot encoding vector
            vector = [0] * len(self.vocab)
            tokens = self._tokenize(text)
            for token in tokens:
                if token in self.vocab:
                    vector[self.vocab[token]] = 1
            
            # Normalize vector (if not all zeros)
            magnitude = sum(v*v for v in vector) ** 0.5
            if magnitude > 0:
                vector = [v/magnitude for v in vector]
            
            vectors.append(vector)
        
        return vectors
            
    def embed_query(self, query):
        # Same process for query
        vector = [0] * len(self.vocab)
        tokens = self._tokenize(query)
        for token in tokens:
            if token in self.vocab:
                vector[self.vocab[token]] = 1
        
        # Normalize vector
        magnitude = sum(v*v for v in vector) ** 0.5
        if magnitude > 0:
            vector = [v/magnitude for v in vector]
        
        return vector
    # Make the object callable
    def __call__(self, text):
        return self.embed_query(text)

# Use our simple embeddings implementation
embeddings = SimpleEmbeddings()
text_contents = [doc.page_content for doc in chunks]
vectors = embeddings.embed_documents(text_contents)  # This will also fit the vocabulary
print(f"Created embeddings with vocabulary size: {len(embeddings.vocab)}")

# Create FAISS vectorstore manually
import faiss
import numpy as np
from typing import Dict, List
from langchain_community.docstore.in_memory import InMemoryDocstore

dimension = len(vectors[0]) if vectors and len(vectors[0]) > 0 else 1
print(f"Vector dimension: {dimension}")
if dimension > 0:
    index = faiss.IndexFlatL2(dimension)
    # Convert to numpy array and ensure correct float type
    vectors_np = np.array(vectors).astype('float32')
    # Add vectors to index
    index.add(vectors_np)
    
    # Create proper docstore and mapping
    docstore = InMemoryDocstore({})
    index_to_docstore_id = {}
    for i in range(len(chunks)):
        # Use standard Python int to avoid numpy data type issues
        docstore.add({str(i): chunks[i]})
        index_to_docstore_id[int(i)] = str(i)
    
    vectorstore = FAISS(embeddings, index, docstore, index_to_docstore_id)
    print("Successfully created vectorstore with simple embeddings")
    
    # Save the vector store to disk - but don't try to load it again
    # due to serialization issues
    # Instead, just use it directly in this session
else:
    print("Error: Could not create vectors with non-zero dimension")

Creating custom embeddings...
Created embeddings with vocabulary size: 4761


`embedding_function` is expected to be an Embeddings object, support for passing in a function will soon be removed.


Vector dimension: 4761
Successfully created vectorstore with simple embeddings


In [7]:
# Cell 6: Setup LLM and QA Chain
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_groq import ChatGroq

# Use Groq LLM instead of OpenAI
llm = ChatGroq(
    model="llama3-70b-8192",  
    temperature=0
)
print("Using Groq LLM")

# Create a prompt template
template = """You are a helpful AI assistant named CTSE Buddy that answers questions about CTSE lecture notes.
Use the following information to answer the user's question:
{context}
Question: {question}
Answer: """

prompt = PromptTemplate(
    template=template,
    input_variables=["context", "question"]
)

# Set up the QA chain
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 3}),
    chain_type_kwargs={"prompt": prompt},
    return_source_documents=True
)
print("QA Chain has been set up successfully")

Using Groq LLM
QA Chain has been set up successfully


In [9]:
# Cell 7: Create Simple Test Function
import traceback
from IPython.display import Markdown

def ask_question(question):
    """Ask a question to the chatbot"""
    try:
        # Print query for debugging
        print(f"Processing query: {question}")
        
        # Get embedding for the query for debugging
        query_embedding = embeddings.embed_query(question)
        print(f"Query embedding created, dimension: {len(query_embedding)}")
        
        # Perform retrieval manually for debugging
        print("Attempting to retrieve relevant documents...")
        query_vector = np.array([query_embedding]).astype("float32")
        scores_and_indices = index.search(query_vector, k=3)
        print(f"Retrieved indices: {scores_and_indices[1][0]}")
        
        # Use the QA chain
        print("Calling QA chain...")
        result = qa_chain({"query": question})
        
        answer = result['result']
        sources = [doc.metadata['source'] for doc in result['source_documents']]
        unique_sources = list(set(sources))
        
        print(f"Q: {question}")
        print(f"A: {answer}")
        print(f"Sources: {', '.join(unique_sources)}")
        print("-" * 50)
        
        return result
    except Exception as e:
        print(f"Error: {str(e)}")
        print("Traceback:", traceback.format_exc())
        return None

# Test with a sample question
test_result = ask_question("What is DevOps?")

Processing query: What is DevOps?
Query embedding created, dimension: 4761
Attempting to retrieve relevant documents...
Retrieved indices: [ 1 36 52]
Calling QA chain...
Q: What is DevOps?
A: According to the lecture notes, DevOps is defined in two ways:

1. "DevOps is the combination of cultural philosophies, practices, and tools that increases an organization’s ability to deliver applications and services at high velocity" - AWS

2. "A compound of development (Dev) and operations (Ops), DevOps is the union of people, process, and technology to continually provide value to customers" - Azure

In summary, DevOps is a set of practices that combines development and operations to deliver applications and services quickly, while continually providing value to customers.
Sources: L2_Lecture 2 - Part 1.pdf, L3_Part2_Cloud Computing 101 (1).pdf, L1_Part1_Intro to DevOps and Beyond.pdf
--------------------------------------------------


In [10]:
from ipywidgets import widgets, Layout
from IPython.display import display, clear_output, Markdown
import random

# Custom CSS styles for a more modern and professional look
custom_styles = """
<style>
.chatbot-container {
    font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
    max-width: 95%;
    margin: 0 auto;
}
.chatbot-header {
    background: linear-gradient(135deg, #4b6cb7 0%, #182848 100%);
    color: white;
    padding: 15px;
    border-radius: 10px 10px 0 0;
    font-weight: bold;
    font-size: 18px;
    display: flex;
    align-items: center;
    justify-content: center;
    margin-bottom: 15px;
    box-shadow: 0 2px 5px rgba(0,0,0,0.1);
}
.user-message {
    background-color: #e3f2fd;
    border-left: 4px solid #2196F3;
    padding: 10px 15px;
    border-radius: 5px;
    margin-bottom: 15px;
    box-shadow: 0 1px 3px rgba(0,0,0,0.1);
}
.bot-message {
    background-color: #ffffff;
    border-left: 4px solid #4CAF50;
    padding: 10px 15px;
    border-radius: 5px;
    margin-bottom: 15px;
    box-shadow: 0 1px 3px rgba(0,0,0,0.1);
}
.bot-message p {
    margin: 0 0 10px 0;
}
.bot-message strong {
    color: #2c3e50;
}
.bot-message ul, .bot-message ol {
    margin-top: 5px;
    margin-bottom: 10px;
    padding-left: 20px;
}
.bot-message li {
    margin-bottom: 5px;
}
.bot-message code {
    background-color: #f8f9fa;
    padding: 2px 4px;
    border-radius: 3px;
    font-family: monospace;
    color: #e83e8c;
}
.sources {
    font-size: 0.85em;
    color: #607d8b;
    font-style: italic;
    margin-top: 5px;
    padding-left: 10px;
    border-left: 2px solid #cfd8dc;
}
.chat-divider {
    border-top: 1px dashed #cfd8dc;
    margin: 15px 0;
}
.error-message {
    background-color: #ffebee;
    color: #d32f2f;
    padding: 10px 15px;
    border-radius: 5px;
    border-left: 4px solid #f44336;
    margin-bottom: 15px;
}
.chatbot-input-area {
    background-color: #f5f7fa;
    padding: 15px;
    border-radius: 0 0 10px 10px;
    border-top: 1px solid #e0e0e0;
}
.typing-animation {
    display: inline-block;
    width: 50px;
    text-align: left;
}
.typing-dot {
    display: inline-block;
    width: 8px;
    height: 8px;
    border-radius: 50%;
    background-color: #4b6cb7;
    animation: typing 1.5s infinite ease-in-out;
    margin-right: 5px;
}
.typing-dot:nth-child(2) {
    animation-delay: 0.2s;
}
.typing-dot:nth-child(3) {
    animation-delay: 0.4s;
    margin-right: 0;
}
@keyframes typing {
    0% { transform: translateY(0); }
    50% { transform: translateY(-7px); }
    100% { transform: translateY(0); }
}
.prompt-container {
    display: flex;
    flex-flow: row nowrap;
    align-items: stretch;
    width: 95%;
    margin: 10px auto;
    padding: 10px;
    border: 1px solid #e0e0e0;
    border-radius: 5px;
    background-color: #f5f7fa;
}
.prompt-container .prompt-input {
    flex-grow: 1;
    margin-right: 10px;
}
.prompt-container .prompt-button {
    width: auto;
    margin-left: 5px;
}
.loading-animation {
    display: inline-block;
    width: 40px;
    text-align: center;
}
.loading-spinner {
    border: 3px solid #f3f3f3; /* Light grey */
    border-top: 3px solid #3498db; /* Blue */
    border-radius: 50%;
    width: 20px;
    height: 20px;
    animation: spin 2s linear infinite;
    margin: 0 auto;
}
@keyframes spin {
    0% { transform: rotate(0deg); }
    100% { transform: rotate(360deg); }
}
</style>
"""

# Display custom styles
display(widgets.HTML(custom_styles))

# Create a header with icon
header = widgets.HTML(
    """
    <div class="chatbot-header">
        <svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" style="margin-right: 10px;">
            <circle cx="12" cy="12" r="10"></circle>
            <path d="M8 14s1.5 2 4 2 4-2 4-2"></path>
            <line x1="9" y1="9" x2="9.01" y2="9"></line>
            <line x1="15" y1="9" x2="15.01" y2="9"></line>
        </svg>
        CTSE Chatbot Buddy
    </div>
    """
)

# Create output area with enhanced styling
output_area = widgets.Output(
    layout=Layout(
        height='400px',
        max_height='600px',
        overflow_y='auto',
        margin='10px 0',
        border='1px solid #e0e0e0',
        padding='15px',
        border_radius='5px'
    )
)

# Create the new container for the prompt and buttons
prompt_box = widgets.HBox(
    [
        widgets.Text(
            value='',
            placeholder='Ask a question about CTSE lecture notes...',
            description='',
            layout=Layout(height='32px', flex_grow='1'),
            style={'description_width': 'initial'},
            continuous_update=False
        ),
        widgets.Button(
            description=' Send',
            icon='paper-plane',
            button_style='primary',
            tooltip='Send your question',
            layout=Layout(height='32px', width='auto'),
            style={'description_width': 'initial'}
        ),
        widgets.Button(
            description=' Clear',
            icon='eraser',
            tooltip='Clear chat history',
            button_style='danger',
            layout=Layout(height='32px', width='auto'),
            style={'description_width': 'initial'}
        ),
    ],
    layout=Layout(
        display='flex',
        flex_flow='row nowrap',
        align_items='stretch',
        width='95%',
        margin='10px auto',
        padding='10px',
        border='1px solid #e0e0e0',
        border_radius='5px',
        background_color='#f5f7fa'
    ),
    style={'padding': '10px'} # Add some padding to the container
)

# Create a sample welcome message
welcome_message = """
<div class="bot-message">
    <strong>👋 Welcome to CTSE AI Assistant!</strong><br><br>
    <p>I can help you with questions about:</p>
    <ul>
        <li>Docker and containerization concepts</li>
        <li>Kubernetes deployment and management</li>
        <li>AWS cloud platform & Cloud Computing</li>
        <li>CAP theorm & Key Essesntials for Building Apps in Cloud</li>
        <li>Introduction to microservices & Design patterns</li>
        <li>Cloud Design Patterns</li>
        <li>Introduction to AI & ML</li>
    </ul>
    <p><strong>How can I assist you today?</strong></p>
</div>
<div class="chat-divider"></div>
"""

# Initialize the chat with a welcome message
with output_area:
    display(widgets.HTML(welcome_message))

# Create the complete layout for the chatbot display area
chatbot_display = widgets.VBox(
    [header, output_area],
    layout=Layout(
        width='95%',
        border='1px solid #e0e0e0',
        border_radius='10px',
        box_shadow='0 4px 6px rgba(0,0,0,0.1)',
        margin='10px auto 0 auto', # Added margin to the top and auto for horizontal centering
        padding='0',
        background_color='white',
        align_items='stretch'
    )
)

# Typing indicator function
def show_typing_indicator():
    with output_area:
        typing_html = """
        <div class="bot-message" style="padding: 5px 15px;">
            <div class="typing-animation">
                <span class="typing-dot"></span>
                <span class="typing-dot"></span>
                <span class="typing-dot"></span>
            </div>
        </div>
        """
        typing_widget = widgets.HTML(typing_html)
        display(typing_widget)
        return typing_widget

# Loading indicator function
def show_loading_indicator():
    with output_area:
        loading_html = """
        <div class="bot-message" style="padding: 5px 15px;">
            <div class="loading-animation">
                <div class="loading-spinner"></div>
            </div>
        </div>
        """
        loading_widget = widgets.HTML(loading_html)
        display(loading_widget)
        return loading_widget

# Define callback function to clear chat history
def on_clear_button_clicked(b):
    output_area.clear_output()
    with output_area:
        display(widgets.HTML(welcome_message))

# Function to format the bot's response with better readability
def format_bot_response(text):
    # This is where we'll implement the formatting logic
    # to break down text into smaller paragraphs and apply formatting
    
    # Split by double newline to respect paragraph breaks
    paragraphs = text.split('\n\n')
    
    formatted_text = ""
    for para in paragraphs:
        # Check if this is a list item
        if para.strip().startswith('- ') or para.strip().startswith('* '):
            # This is likely a list, so we'll format it accordingly
            items = para.strip().split('\n')
            formatted_text += "<ul>\n"
            for item in items:
                if item.strip():
                    formatted_text += f"<li>{item.strip().lstrip('- ').lstrip('* ')}</li>\n"
            formatted_text += "</ul>\n"
        # Check if this is a numbered list
        elif any(line.strip().startswith(f"{i}.") for i in range(1, 10) for line in para.split('\n')):
            items = para.strip().split('\n')
            formatted_text += "<ol>\n"
            for item in items:
                if item.strip():
                    # Remove the number and dot from the beginning
                    cleaned_item = item.strip()
                    for i in range(1, 10):
                        if cleaned_item.startswith(f"{i}. "):
                            cleaned_item = cleaned_item[3:]
                            break
                    formatted_text += f"<li>{cleaned_item}</li>\n"
            formatted_text += "</ol>\n"
        else:
            # Regular paragraph
            # Highlight any terms within backticks as code
            para_with_code = para.replace('`', '<code>', 1)
            while '`' in para_with_code:
                para_with_code = para_with_code.replace('`', '</code>', 1)
                if '`' in para_with_code:
                    para_with_code = para_with_code.replace('`', '<code>', 1)
            
            # Bold any text between ** markers
            para_with_bold = ""
            is_bold = False
            i = 0
            while i < len(para_with_code):
                if i + 1 < len(para_with_code) and para_with_code[i:i+2] == '**':
                    if not is_bold:
                        para_with_bold += '<strong>'
                        is_bold = True
                    else:
                        para_with_bold += '</strong>'
                        is_bold = False
                    i += 2
                else:
                    para_with_bold += para_with_code[i]
                    i += 1
            
            formatted_text += f"<p>{para_with_bold}</p>\n"
    
    return formatted_text

# Define the callback function for sending messages
def on_send_button_clicked(b):
    question = prompt_box.children[0].value # Get text from the input in prompt_box
    if not question.strip():
        return

    # Clear the input field in the prompt_box
    prompt_box.children[0].value = ''

    with output_area:
        # Display the user's question with improved formatting
        display(widgets.HTML(f"""
        <div class="user-message">
            <strong>You:</strong> {question}
        </div>
        """))

        # Show loading indicator
        loading_indicator = show_loading_indicator()

        # Get the answer from the QA chain
        try:
            # Simulate a brief delay for processing
            import time
            time.sleep(random.uniform(1, 3))

            # Remove loading indicator
            loading_indicator.close()

            # Assuming qa_chain is defined elsewhere and takes a dictionary with 'query'
            result = qa_chain({"query": question})
            answer = result['result']
            sources = [doc.metadata['source'] for doc in result['source_documents']]
            unique_sources = list(set(sources))

            # Format the answer for better readability
            formatted_answer = format_bot_response(answer)

            # Display the answer with better formatting
            bot_response = f"""
            <div class="bot-message">
                <strong>Assistant:</strong>
                {formatted_answer}
            """

            if unique_sources:
                source_list = ", ".join(unique_sources)
                bot_response += f"""
                <div class="sources">
                    <strong>Sources:</strong> {source_list}
                </div>
                """

            bot_response += "</div><div class='chat-divider'></div>"
            display(widgets.HTML(bot_response))

        except Exception as e:
            # Remove loading indicator
            loading_indicator.close()

            # Display error with better formatting
            display(widgets.HTML(f"""
            <div class="error-message">
                <strong>Error:</strong> {str(e)}
            </div>
            <div class="chat-divider"></div>
            """))

# Connect the callback functions to the buttons in the prompt_box
prompt_box.children[1].on_click(on_send_button_clicked) # Send button
prompt_box.children[2].on_click(on_clear_button_clicked) # Clear button

# Handle Enter key in the prompt input field
def on_enter(change):
    if change.get('type') == 'change' and change.get('name') == 'value':
        if change.get('new') and '\n' in change.get('new'):
            # Remove the newline character that might be added
            prompt_box.children[0].value = prompt_box.children[0].value.replace('\n', '')
            on_send_button_clicked(None)

# Connect the observe handler to the input field in the prompt_box
prompt_box.children[0].observe(on_enter, 'value')

# Display the complete chatbot interface
display(widgets.HTML('<div class="chatbot-container">'))
display(chatbot_display)
display(prompt_box) # Display the new prompt box after the chatbot display
display(widgets.HTML('</div>'))

HTML(value="\n<style>\n.chatbot-container {\n    font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;…

HTML(value='<div class="chatbot-container">')

VBox(children=(HTML(value='\n    <div class="chatbot-header">\n        <svg xmlns="http://www.w3.org/2000/svg"…

HBox(children=(Text(value='', continuous_update=False, layout=Layout(height='32px'), placeholder='Ask a questi…

HTML(value='</div>')