In [None]:
!pip install chromadb
!pip install PyPDF2

In [None]:
import os
import pickle
import numpy as np
import requests
import json
import time
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.utils import embedding_functions
import pandas as pd
import random
import csv
import re
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from tqdm import tqdm
import PyPDF2
import io

# Phase 1: Choose a Domain and Use Case

In [None]:
def define_domain_and_use_case():
    """
    Define the domain and use case for the RAG chatbot.

    Returns:
        dict: Domain information and use case description
    """
    domain_info = {
        "domain": "Python Programming Education for Children",
        "target_audience": "Children aged 8-12",
        "use_case": "A child-friendly programming assistant that explains Python concepts in simple terms",
        "value_proposition": """
        This project creates a child-friendly Python programming assistant that explains coding concepts
        in simple, accessible ways for young learners aged 8-12. Programming education for children is
        increasingly important as digital literacy becomes essential, yet many resources use technical
        jargon that overwhelms young learners.

        Our chatbot transforms complex programming concepts into age-appropriate explanations with
        relatable examples and encouraging language. It helps children understand fundamentals like
        variables, loops, and functions through analogies to everyday experiences. By making learning
        enjoyable and building confidence, this tool supports early coding education and helps develop
        computational thinking skills critical for future academic and career success.
        """
    }

    print(f"Domain: {domain_info['domain']}")
    print(f"Target Audience: {domain_info['target_audience']}")
    print(f"Use Case: {domain_info['use_case']}")
    print("\nValue Proposition:")
    print(domain_info['value_proposition'])

    return domain_info

# Execute Phase 1
domain_info = define_domain_and_use_case()

# Phase 2: Collect Data with Web Scraping


In [None]:
def collect_and_scrape_data(seed_urls, pdf_urls=None, output_dir="web_scraped_data", min_words=20000):
    """
    Collect data from web sources and organize it for the RAG system.

    Args:
        seed_urls (list): List of URLs to scrape
        pdf_urls (list): List of PDF URLs to scrape
        output_dir (str): Directory to save output files
        min_words (int): Minimum word count to collect

    Returns:
        list: All scraped data as a list of dictionaries
    """
    # Create directory
    os.makedirs(output_dir, exist_ok=True)

    # Helper functions for cleaning and extraction
    def clean_text(text):
        text = ' '.join(text.split())
        text = re.sub(r'\s+', ' ', text)
        return text

    def extract_pdf_text(pdf_content):
        try:
            pdf_file = io.BytesIO(pdf_content)
            pdf_reader = PyPDF2.PdfReader(pdf_file)
            text = ""
            for page in pdf_reader.pages:
                page_text = page.extract_text() or ""
                text += page_text + "\n\n"
            return clean_text(text)
        except Exception as e:
            print(f"Error extracting PDF text: {str(e)}")
            return ""

    def is_pdf_url(url):
        return url.lower().endswith('.pdf') or '/pdf/' in url.lower()

    def scrape_url(url, min_paragraph_length=50):
        try:
            time.sleep(random.uniform(1, 3))  # Polite delay
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
                'Accept': '*/*'
            }
            print(f"Scraping: {url}")
            response = requests.get(url, headers=headers, timeout=15)
            response.raise_for_status()

            # Handle PDFs
            if is_pdf_url(url) or 'application/pdf' in response.headers.get('Content-Type', ''):
                print(f"Processing PDF: {url}")
                content = extract_pdf_text(response.content)
                title = os.path.basename(url) or "PDF Document"
                word_count = len(content.split())
                return {'url': url, 'title': title, 'content': content,
                        'word_count': word_count, 'status': 'success', 'type': 'pdf'}

            # Handle HTML
            soup = BeautifulSoup(response.content, 'html.parser')
            title = soup.title.string if soup.title else "No Title"
            paragraphs = []
            for p in soup.find_all(['p', 'article', 'section', 'div.content', 'div.main']):
                text = p.get_text().strip()
                if len(text) >= min_paragraph_length:
                    paragraphs.append(clean_text(text))
            content = '\n\n'.join(paragraphs)
            word_count = len(content.split())
            return {'url': url, 'title': title, 'content': content,
                    'word_count': word_count, 'status': 'success', 'type': 'html'}

        except Exception as e:
            print(f"Error scraping {url}: {str(e)}")
            return {'url': url, 'title': '', 'content': '',
                    'word_count': 0, 'status': f'error: {str(e)}', 'type': 'unknown'}

    def find_pdf_links(url, html_content):
        pdf_links = []
        try:
            soup = BeautifulSoup(html_content, 'html.parser')
            for a_tag in soup.find_all('a', href=True):
                href = a_tag['href']
                if href.lower().endswith('.pdf'):
                    if not href.startswith(('http://', 'https://')):
                        base_url = url.rstrip('/')
                        if href.startswith('/'):
                            href = f"{'/'.join(base_url.split('/')[:3])}{href}"
                        else:
                            href = f"{base_url}/{href}"
                    pdf_links.append(href)
        except Exception as e:
            print(f"Error finding PDF links: {str(e)}")
        return pdf_links

    # Main scraping logic
    if pdf_urls is None:
        pdf_urls = []

    all_urls = seed_urls + pdf_urls
    all_scraped_data = []
    total_word_count = 0
    pdf_count = 0

    # Scrape all URLs
    for url in tqdm(all_urls, desc="Scraping URLs"):
        result = scrape_url(url)
        all_scraped_data.append(result)

        # Count PDF if successful
        if result['status'] == 'success' and result['type'] == 'pdf':
            pdf_count += 1

        # Look for PDF links in HTML pages
        if result['status'] == 'success' and result['type'] == 'html':
            try:
                headers = {'User-Agent': 'Mozilla/5.0'}
                response = requests.get(url, headers=headers, timeout=10)
                pdf_links = find_pdf_links(url, response.content)
                for pdf_url in pdf_links:
                    if pdf_url not in [item['url'] for item in all_scraped_data]:
                        print(f"Found PDF link: {pdf_url}")
                        pdf_result = scrape_url(pdf_url)
                        all_scraped_data.append(pdf_result)
                        if pdf_result['status'] == 'success' and pdf_result['type'] == 'pdf':
                            pdf_count += 1
            except Exception as e:
                print(f"Error processing PDF links from {url}: {str(e)}")

        # Update word count
        if result['status'] == 'success':
            total_word_count += result['word_count']

    # Save results to files
    for result in all_scraped_data:
        if result['status'] == 'success':
            domain = urlparse(result['url']).netloc
            file_type = "pdf" if result['type'] == 'pdf' else "html"
            filename = f"{domain.replace('.', '_')}_{hash(result['url']) % 10000}.{file_type}.txt"
            filepath = os.path.join(output_dir, filename)

            with open(filepath, 'w', encoding='utf-8') as f:
                f.write(f"URL: {result['url']}\n")
                f.write(f"Title: {result['title']}\n")
                f.write(f"Type: {result['type']}\n")
                f.write(f"Word Count: {result['word_count']}\n\n")
                f.write(result['content'])

    # Save overview to CSV
    csv_file = os.path.join(output_dir, "all_scraped_content.csv")
    with open(csv_file, 'w', encoding='utf-8', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['url', 'title', 'content', 'word_count', 'status', 'type'])
        for item in all_scraped_data:
            truncated = item['content'][:1000] + '...' if len(item['content']) > 1000 else item['content']
            writer.writerow([item['url'], item['title'], truncated, item['word_count'], item['status'], item.get('type', 'unknown')])

    # Save all content to a single file
    with open(os.path.join(output_dir, "all_content.txt"), 'w', encoding='utf-8') as f:
        for item in all_scraped_data:
            if item['status'] == 'success':
                f.write(f"--- {item['title']} ({item.get('type', 'unknown')}) ---\n\n")
                f.write(item['content'])
                f.write("\n\n" + "="*80 + "\n\n")

    # Print statistics
    successful_scrapes = sum(1 for item in all_scraped_data if item['status'] == 'success')
    failed_scrapes = sum(1 for item in all_scraped_data if item['status'] != 'success')
    html_count = sum(1 for item in all_scraped_data if item.get('type') == 'html' and item['status'] == 'success')

    print(f"\nWeb Scraping Statistics:")
    print(f"Total URLs scraped: {len(all_scraped_data)}")
    print(f"Successful scrapes: {successful_scrapes}")
    print(f"Failed scrapes: {failed_scrapes}")
    print(f"HTML pages: {html_count}")
    print(f"PDF documents: {pdf_count}")
    print(f"Total words scraped: {total_word_count}")

    # Check if we have enough content
    if total_word_count < min_words:
        print(f"Warning: Only collected {total_word_count} words, which is less than the target of {min_words}")

    return all_scraped_data

# Define seed URLs
seed_urls = [
    "https://www.programiz.com/python-programming/variables-constants-literals",
    "https://www.programiz.com/python-programming/numbers",
    "https://www.programiz.com/python-programming/if-elif-else",
    "https://www.programiz.com/python-programming/for-loop"
]

pdf_seed_urls = [
    "https://bugs.python.org/file47781/Tutorial_EDIT.pdf"
]

# Execute Phase 2
all_scraped_data = collect_and_scrape_data(seed_urls, pdf_seed_urls, "web_scraped_data__")

# Phase 3: Preprocess and Chunk the Text


In [None]:
def preprocess_and_chunk_text(documents, chunk_size=500, chunk_overlap=100, min_chunk_size=100):
    """
    Process and split documents into overlapping chunks for better retrieval.

    Args:
        documents (list): List of document dictionaries
        chunk_size (int): Target size of each chunk
        chunk_overlap (int): Overlap between chunks
        min_chunk_size (int): Minimum acceptable chunk size

    Returns:
        list: Processed chunks with metadata
    """

    def clean_text(text):
        """Remove extra whitespace and normalize text"""
        text = re.sub(r'\s+', ' ', text).strip()
        text = re.sub(r'\n+', '\n', text)
        return text

    def recursive_text_splitter(text, chunk_size=500, chunk_overlap=100):
        """Split text into chunks using a hierarchical approach with multiple separators"""
        separators = ["\n\n", "\n", ". ", ", ", " ", ""]

        def _split(text, sep):
            """Split text on a specific separator"""
            if sep == "":
                return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]

            chunks, cur_chunk, cur_len = [], [], 0
            for part in text.split(sep):
                part = (sep + part) if cur_chunk else part
                if cur_len + len(part) > chunk_size and cur_chunk:
                    joined = "".join(cur_chunk)
                    chunks.append(joined)
                    overlap = joined[-chunk_overlap:] if chunk_overlap < len(joined) else joined
                    cur_chunk = [overlap, part]
                    cur_len = len(overlap) + len(part)
                else:
                    cur_chunk.append(part)
                    cur_len += len(part)
            if cur_chunk:
                chunks.append("".join(cur_chunk))
            return chunks

        def _recursive(text, idx=0):
            """Recursively try different separators"""
            if len(text) <= chunk_size or idx >= len(separators):
                return [text]
            split_chunks = _split(text, separators[idx])
            result = []
            for chunk in split_chunks:
                result.extend(_recursive(chunk, idx + 1) if len(chunk) > chunk_size else [chunk])
            return result

        return _recursive(text)

    # Process each document
    chunked_documents = []
    chunk_id_counter = 0

    for doc in tqdm(documents, desc="Chunking documents"):
        if doc.get('status') != 'success' or not doc.get('content'):
            continue

        # Clean the content
        cleaned_content = clean_text(doc['content'])

        # Split into chunks
        chunks = recursive_text_splitter(cleaned_content, chunk_size, chunk_overlap)

        # Add metadata to each chunk
        for chunk_text in chunks:
            if len(chunk_text) < min_chunk_size:
                continue  # Skip very small chunks

            chunked_documents.append({
                'content': chunk_text,
                'chunk_id': chunk_id_counter,
                'source_doc_id': doc.get('id', 'unknown'),
                'url': doc.get('url', ''),
                'title': doc.get('title', 'Untitled'),
                'doc_type': doc.get('type', 'unknown'),
                'word_count': len(chunk_text.split())
            })
            chunk_id_counter += 1

    # Summary statistics
    print(f"Original documents: {len([d for d in documents if d.get('status') == 'success'])}")
    print(f"After chunking: {len(chunked_documents)} chunks")
    print(f"Average chunk size: {sum(len(c['content'].split()) for c in chunked_documents) / len(chunked_documents):.1f} words")

    # Save chunks to file
    with open("web_scraped_data__/chunked_documents.json", 'w', encoding='utf-8') as f:
        json.dump(chunked_documents, f, indent=2)

    return chunked_documents

# Execute Phase 3
chunked_data = preprocess_and_chunk_text(all_scraped_data)

# Phase 4: Embed the Chunks

In [None]:
def embed_chunks(chunks, model_name="all-MiniLM-L6-v2", batch_size=32, cache_file=None):
    """
    Convert text chunks to vector embeddings.

    Args:
        chunks (list): List of document chunks
        model_name (str): Name of the sentence transformer model
        batch_size (int): Batch size for embedding
        cache_file (str): Path to save/load embeddings cache

    Returns:
        tuple: (chunks with embeddings, embedding model)
    """
    # Initialize embedding model
    print(f"Loading embedding model: {model_name}")
    model = SentenceTransformer(model_name)

    # Try to load cached embeddings
    if cache_file and os.path.exists(cache_file):
        try:
            print(f"Loading cached embeddings from {cache_file}")
            with open(cache_file, 'rb') as f:
                cached_data = pickle.load(f)

            # Check if cache matches current chunks
            if len(cached_data) == len(chunks) and all(c.get('chunk_id') == cached_data[i].get('chunk_id')
                                                      for i, c in enumerate(chunks)):
                print("Using cached embeddings")
                return cached_data, model
            else:
                print("Cache doesn't match current chunks, recalculating embeddings")
        except Exception as e:
            print(f"Error loading cache: {e}. Recalculating embeddings.")

    # Extract texts for embedding
    texts = [chunk['content'] for chunk in chunks]

    # Compute embeddings
    print(f"Computing embeddings for {len(texts)} chunks...")
    embeddings = model.encode(texts, batch_size=batch_size, show_progress_bar=True)

    # Add embeddings to chunks
    for i, chunk in enumerate(chunks):
        chunk['embedding'] = embeddings[i].tolist() # Convert numpy array to list

    # Save cache if requested
    if cache_file:
        print(f"Saving embeddings to cache: {cache_file}")
        os.makedirs(os.path.dirname(cache_file), exist_ok=True)

        # Fix: Pickle might struggle with complex objects within the 'chunks' list.
        # It's safer to create a simplified representation for caching.
        simplified_chunks = [{k: v for k, v in chunk.items() if k != 'embedding'} for chunk in chunks] # Remove embeddings for pickle

        with open(cache_file, 'wb') as f:
            pickle.dump(simplified_chunks, f, protocol=pickle.HIGHEST_PROTOCOL)

    print(f"Computed {len(embeddings)} embeddings with dimension {embeddings[0].shape[0]}")
    return chunks, model
chunks_with_embeddings, model = embed_chunks(chunked_data)


# Phase 5: Create a Vector Store (ChromaDB)

In [None]:
def create_vector_store(chunks_with_embeddings, model_name="all-MiniLM-L6-v2",
                       collection_name=None, persist_directory=None):
    """
    Create a vector store from embedded chunks.

    Args:
        chunks_with_embeddings (list): Document chunks with embeddings
        model_name (str): Name of the embedding model
        collection_name (str): Name for the Chroma collection
        persist_directory (str): Directory to persist the database

    Returns:
        tuple: (Chroma client, Chroma collection)
    """
    # Initialize Chroma client
    if persist_directory:
        print(f"Initializing persistent Chroma client at: {persist_directory}")
        client = chromadb.PersistentClient(path=persist_directory)
    else:
        print("Initializing in-memory Chroma client")
        client = chromadb.Client()

    # Generate collection name if not provided
    if not collection_name:
        collection_name = f"python_for_kids_{int(time.time())}"

    # Set up embedding function
    embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction(model_name=model_name)

    # Create or get collection
    try:
        collection = client.get_collection(name=collection_name)
        print(f"Found existing collection: {collection_name}")
    except Exception:
        collection = client.create_collection(
            name=collection_name,
            embedding_function=embedding_fn,
            metadata={"description": "Python programming content for children",
                     "created_at": time.strftime("%Y-%m-%d %H:%M:%S")}
        )
        print(f"Created new collection: {collection_name}")

    # Prepare data for Chroma
    ids = [f"chunk_{chunk['chunk_id']}" for chunk in chunks_with_embeddings]
    documents = [chunk['content'] for chunk in chunks_with_embeddings]
    embeddings = [chunk['embedding'] for chunk in chunks_with_embeddings]
    metadatas = [{
        'url': chunk.get('url', ''),
        'title': chunk.get('title', 'Untitled'),
        'chunk_id': str(chunk.get('chunk_id', 0)),
        'doc_type': chunk.get('doc_type', 'unknown'),
        'word_count': str(chunk.get('word_count', 0)),
    } for chunk in chunks_with_embeddings]

    # Add data to collection
    print(f"Adding {len(ids)} documents to the collection...")
    collection.add(
        ids=ids,
        documents=documents,
        metadatas=metadatas,
        embeddings=embeddings
    )

    print(f"Vector store created with {collection.count()} documents")
    return client, collection

# Execute Phase 5
persist_dir = "web_scraped_data__/chroma_db"
client, collection = create_vector_store(
    chunks_with_embeddings,
    model_name="all-MiniLM-L6-v2",
    collection_name="python_for_kids",
    persist_directory=persist_dir
)

# Phase 6: RAG System - Retrieval and Generation

In [None]:
def retrieve_relevant_chunks(query, collection, n_results=5):
    """
    Retrieve the most relevant chunks for a query.

    Args:
        query (str): User query
        collection: ChromaDB collection
        n_results (int): Number of chunks to retrieve

    Returns:
        list: Relevant chunks with metadata
    """
    results = collection.query(
        query_texts=[query],
        n_results=n_results
    )

    retrieved_chunks = []
    for i, (doc, metadata, distance) in enumerate(zip(
        results['documents'][0],
        results['metadatas'][0],
        results['distances'][0]
    )):
        retrieved_chunks.append({
            'content': doc,
            'metadata': metadata,
            'relevance': 1-distance,
            'rank': i+1
        })

    return retrieved_chunks

def format_context(chunks):
    """
    Format retrieved chunks into a string for the LLM context.

    Args:
        chunks (list): Retrieved chunks

    Returns:
        str: Formatted context
    """
    context_parts = [
        f"Source {i+1} [{chunk['metadata']['title']}]:\n{chunk['content']}\n"
        for i, chunk in enumerate(chunks)
    ]
    return "\n".join(context_parts)

def generate_answer(question, context, api_key=None, model="mistral-small", max_tokens=500):
    """
    Generate an answer using an LLM API based on context.

    Args:
        question (str): User question
        context (str): Retrieved context
        api_key (str): API key for Mistral or other LLM
        model (str): Model identifier
        max_tokens (int): Maximum tokens in response

    Returns:
        str: Generated answer
    """
    if not api_key:
        api_key = "22C686MeYEWCtJZlh0rGqdQSnhPGPN9J"  # Directly set API key

    url = "https://api.mistral.ai/v1/chat/completions"

    system_prompt = """You are a helpful, friendly AI assistant designed to teach Python programming to children.
    Your answers should be:
    1. Simple and easy to understand for children
    2. Encouraging and positive
    3. Accurate and based only on the provided context
    4. Include simple examples when appropriate

    If you don't know the answer based on the context, say so politely and suggest where they might find more information."""

    user_prompt = f"""Please answer the following question about Python programming for kids.
    Use ONLY the information in the context provided below.

    CONTEXT:
    {context}

    QUESTION: {question}"""

    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "Authorization": f"Bearer {api_key}"
    }

    payload = {
        "model": model,
        "messages": [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        "temperature": 0.5,
        "max_tokens": max_tokens
    }

    try:
        response = requests.post(url, headers=headers, data=json.dumps(payload))
        response.raise_for_status()
        result = response.json()
        return result["choices"][0]["message"]["content"]
    except Exception as e:
        print(f"Error with LLM API: {str(e)}")
        return f"""
        Error connecting to LLM API: {str(e)}

        For this demo, here's a mock response:

        Based on the context provided, Python variables are containers that store data values.
        You can think of them like labeled boxes where you put different types of information.
        For example, if you write 'name = "Alex"', you're creating a variable called 'name' that stores the word "Alex".
        """

def rag_answer(question, collection, api_key=None, n_chunks=3):
    """
    Complete RAG pipeline: retrieves context and generates answer.

    Args:
        question (str): User question
        collection: ChromaDB collection
        api_key (str): API key for LLM
        n_chunks (int): Number of chunks to retrieve

    Returns:
        dict: Results including question, answer, and context
    """
    print(f"\nProcessing question: {question}")
    print("Retrieving relevant context...")
    chunks = retrieve_relevant_chunks(question, collection, n_results=n_chunks)
    context = format_context(chunks)
    print(f"Retrieved {len(chunks)} relevant chunks")

    print("Generating answer with LLM...")
    answer = generate_answer(question, context, api_key)

    return {
        'question': question,
        'answer': answer,
        'context_chunks': chunks,
        'full_context': context,
        'timestamp': time.strftime("%Y-%m-%d %H:%M:%S")
    }

def display_rag_answer(result):
    """
    Display RAG answer in a readable format.

    Args:
        result (dict): RAG answer result
    """
    from IPython.display import display, Markdown

    print("Question:")
    display(Markdown(f"**{result['question']}**"))

    print("\nRelevant Context:")
    structured_content = ""
    for i, item in enumerate(result['context_chunks']):
        structured_content += f"### Source {i+1}: {item['metadata']['title']}\n"
        structured_content += f"**Relevance Score**: {item['relevance']:.4f}\n"
        structured_content += f"**Word Count**: {item['metadata']['word_count']}\n"
        structured_content += f"[Read More]({item['metadata']['url']})\n\n"
        structured_content += f"{item['content'][:300]}...\n\n---\n\n"
    display(Markdown(structured_content))

    print("\nGenerated Answer:")
    display(Markdown(result['answer']))

# Example usage
def test_rag_system(collection, questions):
    """
    Test the RAG system with a list of questions.

    Args:
        collection: ChromaDB collection
        questions (list): List of test questions

    Returns:
        list: Results for each question
    """
    results = []

    for question in questions:
        result = rag_answer(question, collection, api_key="22C686MeYEWCtJZlh0rGqdQSnhPGPN9J")
        display_rag_answer(result)
        results.append(result)
        print("\n" + "="*80 + "\n")

    return results


In [None]:
question1 = "How do for loops work in Python?"
result1 = rag_answer(question1, collection)
display_rag_answer(result1)

In [None]:
question2 = "what is if conditions in Python?"
result2 = rag_answer(question2, collection)
display_rag_answer(result2)


In [None]:
question3 = "What are variables in Python?"
result3 = rag_answer(question3, collection)
display_rag_answer(result3)

In [None]:
question4 = "What types of numbers can I use in Python?"
result4 = rag_answer(question4, collection)
display_rag_answer(result4)

In [None]:
question5 = "How can I make a simple game in Python?"
result5 = rag_answer(question5, collection)
display_rag_answer(result5)