In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os

folder_path = "/content/drive/My Drive/CTSE Assignment-2/lecture slides"

files = os.listdir(folder_path)
for file in files:
    print(file)

In [None]:
!pip install python-pptx pymupdf

In [None]:
# Import libraries
from pptx import Presentation
import fitz  # PyMuPDF
import os

In [None]:
# Set your Google Drive folder path
folder_path = "/content/drive/My Drive/CTSE Assignment-2/lecture slides"

In [None]:
# Helper function to extract text from PPTX files
def extract_text_from_pptx(file_path):
    ppt = Presentation(file_path)
    text = ""
    for slide in ppt.slides:
        for shape in slide.shapes:
            if hasattr(shape, "text"):
                text += shape.text + "\n"
    return text


In [None]:
# Helper function to extract text from PDF files
def extract_text_from_pdf(file_path):
    pdf = fitz.open(file_path)
    text = ""
    for page in pdf:
        text += page.get_text()
    return text


In [None]:
# Empty string to store all lecture notes text
lecture_notes_text = ""

# Loop through all files and extract
for filename in os.listdir(folder_path):
    file_path = os.path.join(folder_path, filename)

    if filename.endswith(".pptx"):
        text = extract_text_from_pptx(file_path)
        lecture_notes_text += text + "\n"
        print(f"Extracted from {filename}")

    elif filename.endswith(".pdf"):
        text = extract_text_from_pdf(file_path)
        lecture_notes_text += text + "\n"
        print(f"Extracted from {filename}")

print("\n✅ All files processed!")


In [None]:
# Save the extracted text to a .txt file
with open("/content/drive/My Drive/CTSE Assignment-2/lecture_notes_combined.txt", "w", encoding="utf-8") as f:
    f.write(lecture_notes_text)


In [None]:
# Load the raw lecture notes text
with open("/content/drive/My Drive/CTSE Assignment-2/lecture_notes_combined.txt", "r", encoding="utf-8") as f:
    raw_text = f.read()

In [None]:
import re

def enhanced_cleanup_v2(text):
    # Remove emails and URLs
    text = re.sub(r'\S+@\S+', '', text)
    text = re.sub(r'http\S+', '', text)

    # Remove @handles and social mentions
    text = re.sub(r'@\w+', '', text)

    # Remove repeated bios or author names
    text = re.sub(r'Ravindu Nirmal Fernando.*?\n', '', text, flags=re.IGNORECASE)
    text = re.sub(r'Jeewaka Perera.*?\n', '', text, flags=re.IGNORECASE)

    # Remove standard boilerplate and distractions
    patterns = [
        r'Thank You.*?\n',
        r'LinkedIn.*?\n',
        r'Twitter.*?\n',
        r'Sample Footer Text',
        r'Page \d+ of \d+',
        r'^Agenda$', r'^Slide \d+$',
        r'^[A-Z ]{6,}$',                       # ALL CAPS headings
        r'Session \d+.*?\n',                   # Session titles
        r'.*?Certified.*?\n',                  # Certification lines
        r'.*?specialized in.*?\n',             # Education lines
        r'^About Me$',                         # Exact match line
        r'^AWS Community Builder$',            # Exact match line
        r'^STL - DevOps @ Sysco LABS - Sri Lanka$'  # Exact company intro line
    ]

    for p in patterns:
        text = re.sub(p, '', text, flags=re.MULTILINE)

    # Split into lines and strip whitespace
    lines = text.splitlines()
    clean_lines = []

    previous_line = ""
    for line in lines:
        line = line.strip()

        # Skip if line is empty or a repeat
        if not line or line == previous_line:
            continue

        clean_lines.append(line)
        previous_line = line

    cleaned_text = "\n".join(clean_lines)
    return cleaned_text


In [None]:
cleaned_text = enhanced_cleanup_v2(raw_text)

# Preview output
print(cleaned_text[:2000])


In [None]:
with open("/content/drive/My Drive/CTSE Assignment-2/lecture_notes_cleaned_final.txt", "w", encoding="utf-8") as f:
    f.write(cleaned_text)

In [2]:
with open("/content/drive/My Drive/CTSE Assignment-2/lecture_notes_cleaned_final.txt", "r", encoding="utf-8") as f:
    cleaned_text = f.read()

In [None]:
def chunk_text(text, chunk_size=300, overlap=50):
    words = text.split()
    chunks = []
    i = 0

    while i < len(words):
        chunk = words[i:i + chunk_size]
        chunks.append(" ".join(chunk))
        i += chunk_size - overlap  # slide window with overlap

    return chunks

text_chunks = chunk_text(cleaned_text)

# Preview the first chunk
print(f"Total chunks: {len(text_chunks)}")
print("\n--- First Chunk ---\n")
print(text_chunks[0])


In [None]:
!pip install -U sentence-transformers

In [None]:
from sentence_transformers import SentenceTransformer

# Load a small, fast embedding model
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

In [None]:
# Embed each chunk (returns a list of vectors)
embeddings = embedding_model.encode(text_chunks, show_progress_bar=True)

In [None]:
!pip install faiss-cpu

In [None]:
import faiss
import numpy as np

# Convert embeddings to a NumPy array
embedding_array = np.array(embeddings).astype("float32")

# Create a FAISS index (using L2 distance)
index = faiss.IndexFlatL2(embedding_array.shape[1])

# Add embeddings to the index
index.add(embedding_array)

# Confirm number of vectors in index
print(f"Total vectors indexed: {index.ntotal}")


In [9]:
# Save the chunk texts in order for retrieval later
chunk_lookup = text_chunks

In [10]:
# Sample user question
question = "What is DevOps and why is it important?"

# Convert question to vector
question_vector = embedding_model.encode([question])

In [None]:
# Number of top matches to retrieve
top_k = 3

# Perform search
distances, indices = index.search(np.array(question_vector).astype("float32"), top_k)

# Show matched chunks
print("\n🔍 Top Matching Chunks:\n")
for i in indices[0]:
    print(f"- {chunk_lookup[i][:500]}...\n")  # Limit to 500 chars


In [None]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline

# Change to flan-t5-large
model_name = "google/flan-t5-large"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)

qa_pipeline = pipeline("text2text-generation", model=model, tokenizer=tokenizer)


In [13]:
# Chatbot function
def request_chatbot():
    print("🤖 Chatbot ready. Type your questions below. Type 'exit' to quit.")

    while True:
        question = input("\n💬 You: ")

        if question.lower().strip() in ["exit", "quit"]:
            print("👋 Chatbot session ended.")
            break

        # Step 1: Embed question
        q_vector = embedding_model.encode([question])

        # Step 2: Search index
        top_k = 3
        distances, indices = index.search(np.array(q_vector).astype("float32"), top_k)

        # Step 2.5: Check similarity score
        if distances[0][0] > 60:
            print("🤖 Chatbot: Sorry, this topic may not be covered in the lecture content.")
            continue

        # Step 3: Start from full chunk
        full_context = chunk_lookup[indices[0][0]]
        words = full_context.split()

        # Step 4: Detect if it's an MCQ
        is_mcq = any(opt in question for opt in ["A.", "B.", "C.", "D."])

        # Step 5: Build prompt with dynamic trimming
        for trim_len in range(300, 10, -10):
            trimmed_context = " ".join(words[:trim_len])

            if is_mcq:
                prompt = f"""
                You are a technical assistant answering multiple-choice questions using only the given context.

                Context:
                {trimmed_context}

                Question:
                {question}

                Choose the best option (A, B, C, or D) and briefly explain why.

                Answer:"""
            else:
                prompt = f"""
                You are a technical assistant that only answers based on the context provided.

                Context:
                {trimmed_context}

                Question: {question}
                Answer:"""

            tokens = tokenizer(prompt, return_tensors="pt")["input_ids"]
            if tokens.shape[1] <= 512:
                break
        else:
            print("⚠️ Unable to create a valid prompt under token limit. Please simplify the question.")
            continue

        # Step 6: Generate answer
        try:
            response = qa_pipeline(prompt, max_length=300, do_sample=False)[0]['generated_text']
        except Exception as e:
            response = "⚠️ Error generating response: " + str(e)

        # Step 7: Display
        print(f"\n🤖 Chatbot: {response}")


In [20]:
request_chatbot()

🤖 Chatbot ready. Type your questions below. Type 'exit' to quit.

💬 You: What is the purpose of Docker Compose?

🤖 Chatbot: to package and run applications within a loosely isolated environment which is a a container

💬 You: What is DevOps?

🤖 Chatbot: the union of people, process, and technology to continually provide value to customers

💬 You: What is Continuous Delivery?

🤖 Chatbot: Software development practice where code changes are automatically built, tested, and prepared for a release to production

💬 You: What is the role of microservices in a DevOps pipeline?

🤖 Chatbot: Each service can be created, deployed and run independently

💬 You: What is the primary goal of Continuous Integration? A. To deploy every change to production immediately B. To automatically scale infrastructure C. To regularly merge and test code changes D. To replace developers with AI

🤖 Chatbot: C

💬 You: What does Docker Compose help you do? A. Create mobile applications B. Deploy ML models C. Define an

In [None]:
!pip install gradio

In [17]:
def gradio_chatbot(question):
    # Step 1: Embed question
    q_vector = embedding_model.encode([question])

    # Step 2: Search FAISS
    distances, indices = index.search(np.array(q_vector).astype("float32"), 3)

    # Step 2.5: Similarity threshold check (lower = better match)
    if distances[0][0] > 60:
        return "🤖 Sorry, this topic may not be covered in the lecture content."

    # Step 3: Start with full context
    full_context = chunk_lookup[indices[0][0]]
    words = full_context.split()

    # Step 4: Detect if it's a multiple-choice question
    is_mcq = any(opt in question for opt in ["A.", "B.", "C.", "D."])

    # Step 5: Dynamically trim context to fit token limit
    for trim_len in range(300, 10, -10):
        trimmed_context = " ".join(words[:trim_len])

        if is_mcq:
            prompt = f"""
            You are a technical assistant answering multiple-choice questions using only the given context.

            Context:
            {trimmed_context}

            Question:
            {question}

            Choose the best option (A, B, C, or D).

            Answer:"""
        else:
            prompt = f"""
            You are a technical assistant that only answers based on the context provided.

            Context:
            {trimmed_context}

            Question: {question}
            Answer:"""

        # Step 6: Check token count
        tokens = tokenizer(prompt, return_tensors="pt")["input_ids"]
        if tokens.shape[1] <= 512:
            break
    else:
        return "⚠️ The context is too long for the model to handle. Please try a shorter question."

    # Step 7: Generate response
    try:
        response = qa_pipeline(prompt, max_length=300, do_sample=False)[0]['generated_text']
    except Exception as e:
        response = "⚠️ Error generating response: " + str(e)

    return response


In [None]:
import gradio as gr

# Build the UI
interface = gr.Interface(
    fn=gradio_chatbot,
    inputs=gr.Textbox(lines=4, placeholder="Ask a question or MCQ here..."),
    outputs="text",
    title="CTSE Lecture Chatbot",
    description="Ask questions based on your lecture slides. Type MCQs or free-text questions.",
)

# Launch it
interface.launch()
