In [1]:
import json
import re
from langchain_core.documents import Document
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter

In [2]:
def load_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        data = json.load(file)
    return data

sutd_data = load_data('markdown_data.json')

In [10]:
def process_documents(data):
    processed_docs = []

    for item in data:
        # Skip entries with empty markdown content
        if not item.get('markdown'):
            continue

        # Extract metadata
        metadata = {
            'title': item.get('title', ''),
            'url': item.get('url', ''),
            'description': item.get('description', ''),
            'pillar': extract_pillar(item.get('title', ''), item.get('url', ''))
        }

        # Add the markdown content with normalized headers
        content = normalize_headers(item.get('markdown', ''))

        # Create a LangChain Document object
        doc = Document(
            page_content=content,
            metadata=metadata
        )

        processed_docs.append(doc)

    return processed_docs

def extract_pillar(title, url):
    """Extract the pillar/department from title or URL"""
    pillars = ['ISTD', 'ESD', 'EPD', 'ASD', 'DAI', 'HASS', 'SMT']

    for pillar in pillars:
        if pillar in title or pillar.lower() in url.lower():
            return pillar
    return 'General'

def normalize_headers(markdown_text):
    """Ensure headers are properly formatted for splitting"""
    # Make sure there's a space after # characters for proper header parsing
    markdown_text = re.sub(r'(#{1,6})([^#\s])', r'\1 \2', markdown_text)
    return markdown_text

In [4]:
processed_docs = process_documents(sutd_data)

In [5]:
print(process_documents)

<function process_documents at 0x109681e40>


In [6]:
def document_to_dict(doc):
    return {
        'page_content': doc.page_content,
        'metadata': dict(doc.metadata)
    }

output_path = "parsed_docs.json"
with open(output_path, 'w', encoding='utf-8') as f:
    # Convert each Document to a dictionary before saving
    serializable_docs = [document_to_dict(doc) for doc in processed_docs]
    json.dump(serializable_docs, f, indent=2, ensure_ascii=False)

print(f"Processed documents saved to {output_path}")

Processed documents saved to parsed_docs.json


In [9]:
import json
import re
import pandas as pd
from markdown import markdown
from bs4 import BeautifulSoup
import numpy as np

# Load the JSON data
with open('parsed_docs.json', 'r') as f:
    data = json.load(f)

# Preview the data structure
print(f"Total documents: {len(data)}")
print(f"Keys in first document: {data[0].keys()}")
print(f"Metadata fields: {data[0]['metadata'].keys()}")

Total documents: 27
Keys in first document: dict_keys(['page_content', 'metadata'])
Metadata fields: dict_keys(['title', 'url', 'description', 'pillar'])


In [11]:
def extract_markdown_hierarchy(markdown_text):
    """
    Parse markdown text to extract hierarchical structure based on headings.
    Returns a list of sections with their headings, content, and parent headings.
    """
    # Convert markdown to HTML for easier parsing
    html = markdown(markdown_text)
    soup = BeautifulSoup(html, 'html.parser')

    sections = []
    current_section = {"title": "Root", "level": 0, "content": "", "parents": []}
    heading_tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']

    # First pass: identify all headings and their levels
    headings = []
    for tag in soup.find_all(heading_tags):
        level = int(tag.name[1])
        headings.append({
            "tag": tag,
            "title": tag.get_text().strip(),
            "level": level,
        })

    # If no headings found, treat the entire document as one section
    if not headings:
        return [{"title": "Root", "level": 0, "content": markdown_text, "parents": []}]

    # Second pass: extract section content and build hierarchy
    for i, heading in enumerate(headings):
        # Find content up to the next heading or end of document
        content_elements = []
        element = heading["tag"].next_sibling

        while element and (i == len(headings) - 1 or element != headings[i+1]["tag"]):
            if element.name not in heading_tags:
                if hasattr(element, 'get_text'):
                    content_elements.append(str(element))
            element = element.next_sibling

        # Get parent headings
        parent_titles = []
        for prev_heading in reversed(headings[:i]):
            if prev_heading["level"] < heading["level"]:
                parent_titles.insert(0, prev_heading["title"])

        # Build section
        section = {
            "title": heading["title"],
            "level": heading["level"],
            "content": ''.join(content_elements),
            "parents": parent_titles
        }
        sections.append(section)

    return sections

In [12]:
def extract_internal_urls(content):
    """
    Extract all internal URLs (links to other SUTD pages) from the content.
    """
    # Pattern to match markdown links
    pattern = r'\[.*?\]\((https?://www\.sutd\.edu\.sg/[^)]+)\)'

    # Find all matches
    urls = re.findall(pattern, content)

    # Also check for HTML links if any HTML is embedded in the markdown
    if '<a href="' in content:
        html_pattern = r'<a href="(https?://www\.sutd\.edu\.sg/[^"]+)"'
        html_urls = re.findall(html_pattern, content)
        urls.extend(html_urls)

    return list(set(urls))  # Remove duplicates

In [13]:
def process_document(doc):
    """
    Process a single document into chunks with enhanced metadata.
    """
    # Extract basic metadata
    metadata = doc['metadata']
    content = doc['page_content']

    # Extract markdown structure
    sections = extract_markdown_hierarchy(content)

    # Process each section into a chunk
    chunks = []
    for section in sections:
        # Skip very short sections (likely just headings)
        if len(section["content"]) < 10 and section["level"] > 0:
            continue

        # Reconstruct full text for this section
        section_title = f"# {section['title']}" if section["level"] > 0 else ""
        section_text = f"{section_title}\n\n{section['content']}"

        # Extract internal URLs
        internal_urls = extract_internal_urls(section_text)

        # Create enhanced metadata
        enhanced_metadata = {
            **metadata,  # Original metadata (title, url, description, pillar)
            'section_title': section['title'],
            'parent_sections': section['parents'],
            'section_level': section['level'],
            'internal_links': internal_urls
        }

        # Add to chunks
        chunks.append({
            'text': section_text.strip(),
            'metadata': enhanced_metadata
        })

    return chunks

# Process all documents
all_chunks = []
for doc in data:
    doc_chunks = process_document(doc)
    all_chunks.extend(doc_chunks)

print(f"Total chunks created: {len(all_chunks)}")

Total chunks created: 213


In [14]:
def refine_chunks(chunks, min_size=100, max_size=1000):
    """
    Refine chunks to ensure they're within optimal size limits.
    Combines small chunks and splits large ones.
    """
    from langchain.text_splitter import RecursiveCharacterTextSplitter

    refined_chunks = []
    current_text = ""
    current_metadata = None

    # First pass: combine small chunks with the same parent
    for chunk in chunks:
        # If chunk is too small and has the same parent as previous chunk
        if (len(chunk['text']) < min_size and current_metadata and
            chunk['metadata']['parent_sections'] == current_metadata['parent_sections']):
            # Append to current text
            current_text += "\n\n" + chunk['text']
        else:
            # Add previous combined chunk if it exists
            if current_text:
                refined_chunks.append({
                    'text': current_text,
                    'metadata': current_metadata
                })

            # Start new current chunk
            current_text = chunk['text']
            current_metadata = chunk['metadata']

    # Add the last combined chunk if it exists
    if current_text:
        refined_chunks.append({
            'text': current_text,
            'metadata': current_metadata
        })

    # Second pass: split large chunks
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=max_size,
        chunk_overlap=100,
        separators=["\n\n", "\n", ".", " ", ""]
    )

    final_chunks = []
    for chunk in refined_chunks:
        if len(chunk['text']) > max_size:
            # Split the text
            split_texts = text_splitter.split_text(chunk['text'])

            # Create new chunks with the same metadata
            for split_text in split_texts:
                final_chunks.append({
                    'text': split_text,
                    'metadata': chunk['metadata']
                })
        else:
            final_chunks.append(chunk)

    return final_chunks

# Refine the chunks
refined_chunks = refine_chunks(all_chunks)
print(f"Total refined chunks: {len(refined_chunks)}")

Total refined chunks: 270


In [20]:
def clean_chunk_text(chunks):
    """
    Clean HTML and markdown formatting from chunk text while preserving content.
    """
    from bs4 import BeautifulSoup
    import re

    cleaned_chunks = []

    for chunk in chunks:
        # Get the original text
        original_text = chunk['text']

        # Step 1: Clean markdown headings
        # Remove heading markers like "# About SUTD"
        text = re.sub(r'^\s*#+\s+(.+?)$', r'\1', original_text, flags=re.MULTILINE)

        # Step 2: Parse and clean HTML
        # First ensure we're parsing as HTML
        soup = BeautifulSoup(text, 'html.parser')

        # Get text content - this removes all HTML tags
        clean_text = soup.get_text()

        # Step 3: Fix spacing and formatting
        # Replace multiple newlines with at most two
        clean_text = re.sub(r'\n{3,}', '\n\n', clean_text)

        # Remove leading/trailing whitespace
        clean_text = clean_text.strip()

        # Create new chunk with cleaned text
        cleaned_chunk = {
            'text': clean_text,
            'metadata': chunk['metadata']  # Keep original metadata
        }

        cleaned_chunks.append(cleaned_chunk)

    return cleaned_chunks

# Add this after refining the chunks
cleaned_chunks = clean_chunk_text(refined_chunks)
print(f"Total cleaned chunks: {len(cleaned_chunks)}")

Total cleaned chunks: 270


In [21]:
def analyze_chunks(chunks):
    """
    Analyze chunks to ensure quality and identify any issues.
    """
    # Get length statistics
    lengths = [len(chunk['text']) for chunk in chunks]

    stats = {
        'count': len(chunks),
        'min_length': min(lengths),
        'max_length': max(lengths),
        'avg_length': sum(lengths) / len(chunks),
        'median_length': sorted(lengths)[len(lengths) // 2],
        'pillars': {}
    }

    # Count chunks by pillar
    for chunk in chunks:
        pillar = chunk['metadata']['pillar']
        if pillar not in stats['pillars']:
            stats['pillars'][pillar] = 0
        stats['pillars'][pillar] += 1

    # Check for very short chunks
    short_chunks = [chunk for chunk in chunks if len(chunk['text']) < 50]
    if short_chunks:
        print(f"Warning: {len(short_chunks)} chunks are very short (<50 chars)")

    # Verification: Check if important URLs are preserved
    urls_present = any('internal_links' in chunk['metadata'] and chunk['metadata']['internal_links']
                     for chunk in chunks)
    if not urls_present:
        print("Warning: No internal links found in chunk metadata")

    return stats

# Analyze the chunks
chunk_stats = analyze_chunks(refined_chunks)
print("Chunk statistics:")
print(json.dumps(chunk_stats, indent=2))

# Save a few example chunks for inspection
import random
sample_chunks = random.sample(refined_chunks, min(5, len(refined_chunks)))
print("\nSample chunks:")
for i, chunk in enumerate(sample_chunks):
    print(f"\nChunk {i+1}:")
    print(f"Length: {len(chunk['text'])}")
    print(f"Title: {chunk['metadata']['section_title']}")
    print(f"Pillar: {chunk['metadata']['pillar']}")
    print(f"Internal links: {chunk['metadata'].get('internal_links', [])}")
    print(f"First 100 chars: {chunk['text'][:100]}...")

Chunk statistics:
{
  "count": 270,
  "min_length": 6,
  "max_length": 995,
  "avg_length": 397.4,
  "median_length": 317,
  "pillars": {
    "General": 97,
    "ISTD": 44,
    "ASD": 23,
    "DAI": 30,
    "EPD": 37,
    "ESD": 39
  }
}

Sample chunks:

Chunk 1:
Length: 330
Title: Specialisation Tracks
Pillar: EPD
Internal links: []
First 100 chars: # Specialisation Tracks


<ul>
<li>Beyond Industry 4.0</li>
<li>Computer Engineering</li>
<li>Electr...

Chunk 2:
Length: 965
Title: Curriculum Structure
Pillar: ESD
Internal links: ['https://www.sutd.edu.sg/course/40-319-statistical-and-machine-learning/', 'https://www.sutd.edu.sg/course/40-316-game-theory/', 'https://www.sutd.edu.sg/course/50-045-information-retrieval/', 'https://www.sutd.edu.sg/course/50-039-theory-and-practice-of-deep-learning/', 'https://www.sutd.edu.sg/course/30-100-computational-and-data-driven-engineering/', 'https://www.sutd.edu.sg/course/50-055-special-topic-machine-learning-operations/', 'https://www.sutd.edu.sg

In [22]:
def export_chunks(chunks, output_file="processed_sutd_chunks.json"):
    """
    Export processed chunks to a JSON file.
    """
    # Convert chunks to a serializable format
    serializable_chunks = []
    for chunk in chunks:
        serializable_chunks.append({
            'text': chunk['text'],
            'metadata': chunk['metadata']
        })

    # Write to file
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(serializable_chunks, f, indent=2, ensure_ascii=False)

    print(f"Exported {len(chunks)} chunks to {output_file}")
    return output_file

# Export the refined chunks
output_file = export_chunks(refined_chunks)

Exported 270 chunks to processed_sutd_chunks.json
