In [2]:
from bs4 import BeautifulSoup
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
import re
from collections import defaultdict
import math
import nltk
import ipywidgets as widgets
from IPython.display import display

def parse_courses(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    courses = []
    
    # Find all course blocks
    for course_block in soup.find_all('div', class_='courseblock'):
        # Extract course code and title
        title_element = course_block.find('p', class_='courseblocktitle')
        if title_element and title_element.find('strong'):
            title_text = title_element.find('strong').text.strip()
            
            # Parse the course code and title
            # Format: "CS 1100. Computer Science and Its Applications. (4 Hours)"
            code_title_pattern = r'([A-Z]+\s*\d+)\.\s*(.*?)\.\s*\(.*?\)'
            match = re.match(code_title_pattern, title_text)
            
            if match:
                code = match.group(1).replace(' ', '')  # Remove spaces in code
                title = match.group(2).strip()
                
                # Extract description
                desc_element = course_block.find('p', class_='cb_desc')
                desc = desc_element.text.strip() if desc_element else ""
                
                # Extract prerequisites and corequisites if available
                prereqs = []
                coreqs = []
                for extra in course_block.find_all('p', class_='courseblockextra'):
                    if 'Prerequisite' in extra.text:
                        prereq_links = extra.find_all('a', class_='bubblelink')
                        prereqs = [link.text.strip() for link in prereq_links]
                    elif 'Corequisite' in extra.text:
                        coreq_links = extra.find_all('a', class_='bubblelink')
                        coreqs = [link.text.strip() for link in coreq_links]
                
                # Extract course level
                level_match = re.search(r'(\d{4})', code)
                level = int(level_match.group(1)) if level_match else 0
                
                courses.append({
                    'code': code,
                    'title': title,
                    'description': desc,
                    'prerequisites': prereqs,
                    'corequisites': coreqs,
                    'level': level
                })
    
    return courses

# Download NLTK stopwords if not already downloaded
nltk.download('stopwords')

stemmer = PorterStemmer()
stop_words = set(stopwords.words('english'))
# Expanded synonyms dictionary
synonyms = {
    'ai': ['artificial', 'intelligence', 'ai'],
    'ml': ['machine', 'learning', 'ml'],
    'programming': ['coding', 'development', 'programming'],
    'web': ['website', 'internet', 'web'],
    'data': ['information', 'records', 'data'],
    'vr': ['virtual', 'reality', 'vr', 'virtualreality'],
    'ar': ['augmented', 'reality', 'ar'],
    'cs': ['computer', 'science', 'cs'],
    'it': ['information', 'technology', 'it'],
    'cybersecurity': ['cyber', 'security', 'cybersecurity'],
    'database': ['db', 'data', 'base'],
    'network': ['net', 'work', 'network'],
    'software': ['app', 'application', 'software'],
    'ir': ['information', 'retrieval', 'ir'],
    'hci': ['human', 'computer', 'interaction', 'hci'],
    'graphics': ['graphic', 'design', 'graphics'],
    'algorithms': ['algorithm', 'algorithms'],
    'theory': ['theoretical', 'concepts', 'theory'],
    'systems': ['system', 'infrastructure', 'systems'],
    'nlp': ['natural', 'language', 'processing', 'nlp']
}

def process_query(query):
    # Tokenization and Lowercasing
    tokens = re.findall(r'\b\w+\b', query.lower())
    # Stopword Removal
    tokens = [t for t in tokens if t not in stop_words]
    # Stemming
    stemmed_tokens = [stemmer.stem(t) for t in tokens]
    
    # Combine original and stemmed tokens
    all_tokens = list(set(tokens + stemmed_tokens))
    
    # Synonym Expansion
    expanded = []
    for t in all_tokens:
        if t in synonyms:
            expanded.extend(synonyms[t])
        else:
            expanded.append(t)
    
    return list(set(expanded))  # Remove duplicates

def build_inverted_index(courses):
    index = {}
    for course in courses:
        text = f"{course['code']} {course['title']} {course['description']}".lower()
        tokens = process_query(text)
        for token in set(tokens):  # Deduplicate per document
            if token not in index:
                index[token] = []
            index[token].append(course['code'])
    return index

def build_bm25_index(courses):
    bm25_index = defaultdict(dict)
    doc_lengths = {}
    df = defaultdict(int)
    N = len(courses)
    
    # Precompute term frequencies and document frequencies
    for course in courses:
        text = f"{course['code']} {course['title']} {course['description']}".lower()
        tokens = process_query(text)
        doc_lengths[course['code']] = len(tokens)
        tf = defaultdict(int)
        for token in tokens:
            tf[token] += 1
        for token in set(tokens):
            df[token] += 1
        bm25_index[course['code']] = dict(tf)  # Convert to regular dict
    
    # Store additional stats
    avg_dl = sum(doc_lengths.values()) / N if N > 0 else 0
    return bm25_index, df, doc_lengths, avg_dl

def bm25_score(query_terms, doc_id, bm25_index, df, doc_lengths, avg_dl, k1=1.5, b=0.75):
    score = 0.0
    N = len(doc_lengths)
    
    # Check if doc_id exists in the index
    if doc_id not in bm25_index:
        return 0.0
        
    tf = bm25_index[doc_id]
    dl = doc_lengths.get(doc_id, 0)
    
    for term in query_terms:
        if term in tf:
            # Fixed IDF calculation to avoid negative values
            idf = max(0, math.log((N - df[term] + 0.5) / (df[term] + 0.5) + 1))
            term_score = idf * (tf[term] * (k1 + 1)) / (tf[term] + k1 * (1 - b + b * dl / avg_dl)) if avg_dl > 0 else 0
            score += term_score
    
    return score

def pagerank_score(course, query_terms):
    # Base score from course level
    level_score = min(1.0, course['level'] / 6000) if course['level'] > 0 else 0  # Normalize to 0-1 range
    
    # Title match bonus (higher rank if query terms appear in title)
    title_words = set(process_query(course['title']))
    title_match = len(set(query_terms).intersection(title_words)) / max(1, len(query_terms))
    
    # Description match bonus
    desc_words = set(process_query(course['description']))
    desc_match = len(set(query_terms).intersection(desc_words)) / max(1, len(query_terms))
    
    # Combine scores with weights
    return (level_score * 0.3) + (title_match * 0.5) + (desc_match * 0.2)

def apply_filters(results, query):
    # Process query for negation detection
    query_lower = query.lower()
    
    # Check for negation patterns
    not_masters = any(pattern in query_lower for pattern in ["not master", "not graduate", "no master", "no graduate"])
    not_undergrad = any(pattern in query_lower for pattern in ["not undergrad", "not undergraduate", "no undergrad", "no undergraduate"])
    
    # Level-based filters
    level_filters = {}
    
    # Add standard level filters
    if "master" in query_lower or "masters" in query_lower or "master's" in query_lower or "graduate" in query_lower:
        # Only if not negated
        if not not_masters:
            level_filters["masters"] = lambda c: c['level'] >= 5000
    
    if "undergrad" in query_lower or "undergraduate" in query_lower:
        # Only if not negated
        if not not_undergrad:
            level_filters["undergraduate"] = lambda c: c['level'] < 5000
    
    # Add negation filters
    if not_masters:
        level_filters["not_masters"] = lambda c: c['level'] < 5000
    
    if not_undergrad:
        level_filters["not_undergrad"] = lambda c: c['level'] >= 5000
    
    # Name/content-based filters
    name_filters = {
        "advanced": lambda c: "advanced" in c['title'].lower() or "advanced" in c['description'].lower(),
        "intro": lambda c: any(word in c['title'].lower() for word in ["intro", "introduction", "introductory"]) or 
                         any(word in c['description'].lower() for word in ["intro", "introduction", "introductory"]),
        "introductory": lambda c: any(word in c['title'].lower() for word in ["intro", "introduction", "introductory"]) or 
                               any(word in c['description'].lower() for word in ["intro", "introduction", "introductory"]),
        "beginner": lambda c: any(word in c['title'].lower() for word in ["beginner", "beginning", "elementary", "fundamental"]) or
                           any(word in c['description'].lower() for word in ["beginner", "beginning", "elementary", "fundamental"])
    }
    
    # Combine all filters
    all_filters = {**level_filters, **name_filters}
    
    # Extract active filters and build a clean query
    active_filters = []
    filter_terms = set(name_filters.keys()) | {"master", "masters", "master's", "graduate", "undergrad", "undergraduate", 
                                              "not", "no"}
    
    # Clean query by removing filter terms and negation patterns
    clean_query = query_lower
    for term in filter_terms:
        clean_query = re.sub(r'\b' + re.escape(term) + r'\b', '', clean_query)
    
    # Remove negation patterns
    clean_query = re.sub(r'not\s+\w+', '', clean_query)
    clean_query = re.sub(r'no\s+\w+', '', clean_query)
    
    # Apply all level filters
    for filter_func in level_filters.values():
        active_filters.append(filter_func)
    
    # Apply name filters only if they appear in the query
    for term, filter_func in name_filters.items():
        if term in query_lower:
            active_filters.append(filter_func)
    
    # Clean up the query
    clean_query = re.sub(r'\s+', ' ', clean_query).strip()
    
    # If no filters specified, return all results
    if not active_filters:
        return results, clean_query
    
    # Apply all active filters
    filtered_results = []
    for course in results:
        if all(f(course) for f in active_filters):
            filtered_results.append(course)
    
    return filtered_results, clean_query

def search_courses(query, index_method='Both'):
    query = query.strip()
    if not query:
        return []
    
    # First apply filters (and get clean query without filter terms)
    results = courses  # Start with all courses
    results, clean_query = apply_filters(results, query)
    
    # If clean_query is empty (only filter terms), return filtered results
    if not clean_query.strip():
        return results
    
    # Continue with normal search on cleaned query
    processed_terms = process_query(clean_query)
    if not processed_terms:
        return results  # Return filtered results if no search terms left
    
    combined_scores = defaultdict(float)
    
    # Inverted index lookup
    if index_method in ['Inverted Index', 'Both']:
        doc_codes = set()
        for term in processed_terms:
            if term in inverted_index:
                doc_codes.update(inverted_index[term])
        
        # Only consider courses that passed the filters
        filtered_codes = {course['code'] for course in results}
        doc_codes = doc_codes.intersection(filtered_codes)
        
        for code in doc_codes:
            combined_scores[code] += 1.0
    
    # BM25 scoring - only for courses that passed the filters
    if index_method in ['BM25', 'Both']:
        for course in results:
            code = course['code']
            score = bm25_score(processed_terms, code, bm25_index, df, doc_lengths, avg_dl)
            if score > 0:
                combined_scores[code] += min(5.0, score)
    
    # If we have scores, return ranked results
    if combined_scores:
        # Get unique course objects
        course_dict = {c['code']: c for c in results}
        scored_courses = []
        
        for code, score in combined_scores.items():
            if code in course_dict:
                course = course_dict[code]
                # Add pagerank component to score
                pr_score = pagerank_score(course, processed_terms)
                final_score = score + (pr_score * 2.0)  # Weighted combination
                scored_courses.append((course, final_score))
        
        # Sort by combined score
        scored_courses.sort(key=lambda x: x[1], reverse=True)
        return [s[0] for s in scored_courses]
    
    # If no scores (no matching terms), return filtered results
    return results

# Load HTML content
try:
    with open('cs_catalog.html', 'r', encoding='utf-8') as file:
        html_content = file.read()
    
    courses = parse_courses(html_content)
    print(f"Successfully parsed {len(courses)} courses.")
    
    # Create indices
    inverted_index = build_inverted_index(courses)
    bm25_index, df, doc_lengths, avg_dl = build_bm25_index(courses)
    
except Exception as e:
    print(f"Error loading or parsing course data: {e}")
    courses = []
    inverted_index = {}
    bm25_index, df, doc_lengths, avg_dl = {}, {}, {}, 0

# Query input
query_input = widgets.Text(
    placeholder='Enter your query...',
    description='Query:',
    disabled=False
)

# Index selection
index_selector = widgets.RadioButtons(
    options=['Inverted Index', 'BM25', 'Both'],
    description='Index:',
    disabled=False,
    value='Both'  # Default selection
)

# Maximum results selector
max_results = widgets.IntSlider(
    value=10,
    min=5,
    max=50,
    step=5,
    description='Max Results:',
    disabled=False
)

# Submit button
submit_button = widgets.Button(description='Search')

# Output display
output = widgets.Output()

def on_submit(b):
    query = query_input.value
    
    # Search for courses
    results = search_courses(query, index_selector.value)
    
    # Display results
    with output:
        output.clear_output()
        print(f"Results for query: '{query}'")
        
        if not results:
            print("No matching courses found.")
        else:
            limit = min(len(results), max_results.value)
            for i, course in enumerate(results[:limit]):
                print(f"{i+1}. {course['code']}: {course['title']}")
                
                # Display prerequisites if available
                if course['prerequisites']:
                    print(f"   Prerequisites: {', '.join(course['prerequisites'])}")
                    
                # Display corequisites if available
                if course['corequisites']:
                    print(f"   Corequisites: {', '.join(course['corequisites'])}")
                    
                print(f"   Level: {course['level']}")
                
                # Display description with truncation if too long
                desc = course['description']
                if len(desc) > 100:
                    print(f"   {desc[:100]}...")
                else:
                    print(f"   {desc}")
                print()

# Event handler for submit button
submit_button.on_click(on_submit)

# Event handler for Enter key in query input
def on_enter(widget):
    on_submit(None)
query_input.on_submit(on_enter)

# Display widgets
display(query_input, index_selector, max_results, submit_button, output)

Successfully parsed 172 courses.


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/someshb/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
  query_input.on_submit(on_enter)


Text(value='', description='Query:', placeholder='Enter your query...')

RadioButtons(description='Index:', index=2, options=('Inverted Index', 'BM25', 'Both'), value='Both')

IntSlider(value=10, description='Max Results:', max=50, min=5, step=5)

Button(description='Search', style=ButtonStyle())

Output()

In [47]:
import xml.etree.ElementTree as ET
import xml.dom.minidom as minidom

def export_to_xml(courses, filename="courses.xml"):
    """
    Export the parsed course data to an XML file.
    
    Parameters:
    courses (list): List of course dictionaries
    filename (str): Name of the output XML file
    """
    # Create the root element
    root = ET.Element("courses")
    
    # Add each course as a child element
    for course in courses:
        course_elem = ET.SubElement(root, "course")
        
        # Add course code
        code_elem = ET.SubElement(course_elem, "code")
        code_elem.text = course['code']
        
        # Add course title
        title_elem = ET.SubElement(course_elem, "title")
        title_elem.text = course['title']
        
        # Add course description
        description_elem = ET.SubElement(course_elem, "description")
        description_elem.text = course['description']
        
        # Add course level
        level_elem = ET.SubElement(course_elem, "level")
        level_elem.text = str(course['level'])
        
        # Add prerequisites if available
        if course['prerequisites']:
            prereqs_elem = ET.SubElement(course_elem, "prerequisites")
            for prereq in course['prerequisites']:
                prereq_elem = ET.SubElement(prereqs_elem, "prerequisite")
                prereq_elem.text = prereq
        
        # Add corequisites if available
        if course['corequisites']:
            coreqs_elem = ET.SubElement(course_elem, "corequisites")
            for coreq in course['corequisites']:
                coreq_elem = ET.SubElement(coreqs_elem, "corequisite")
                coreq_elem.text = coreq
    
    # Convert to string and pretty print
    rough_string = ET.tostring(root, 'utf-8')
    reparsed = minidom.parseString(rough_string)
    pretty_xml = reparsed.toprettyxml(indent="  ")
    
    # Write to file
    with open(filename, "w", encoding="utf-8") as f:
        f.write(pretty_xml)
    
    return filename

# Example usage:
# After parsing the courses from HTML
filename = export_to_xml(courses)
print(f"Course data exported to {filename}")

Course data exported to courses.xml
