In [None]:
# Copyright © 2025 Preetimant Bora Bhowal. All rights reserved.
# Unauthorized copying, distribution, or use of this file is strictly prohibited.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install python-docx pdfplumber owlready2



In [None]:
import os
import json
from docx import Document
import re
import pdfplumber
from pathlib import Path
import shutil
import uuid
import zipfile
import time
from datetime import datetime
import owlready2
from owlready2 import *
from owlready2 import World, get_ontology, DataProperty, ObjectProperty, FunctionalProperty
import logging

# **Directory Crawler**

In [None]:
def find_course_outlines(root_dir):
    course_files = []
    for root, dirs, files in os.walk(root_dir):
        # Skip "Output" folder
        if "Output" in root.split(os.sep):
            continue
        for file in files:
            if file.lower().endswith((".pdf", ".docx")):
                # Extract program and term from path
                path_parts = root.split(os.sep)
                program = path_parts[-2] if "term_" in path_parts[-1] else None
                term = path_parts[-1] if "term_" in path_parts[-1] else None

                if program and term:
                    course_files.append({
                        "path": os.path.join(root, file),
                        "program": program,
                        "term": term,
                        "filename": file
                    })
    return course_files

# **Common Code and Pattern**

In [None]:
def get_default(value, default="NA"):
    """
    Returns the default value if the provided value is None,
    empty, or just a hyphen.
    """
    if value is None or value.strip() == "" or value.strip() == "-":
        return default
    return value.strip()

In [None]:
course_metadata_keys = {
    "Course Code and Course Title": r"(?i).*\b(course code|course title)\b.*",
    "Course type": r"(?i).*\bcourse type\b.*",
    "Pre-requisites (if any)": r"(?i).*\b(pre[-\s]?requisites)( \(if any\))?\b.*",
    "Course Credit": r"(?i).*\b(course credit|course unit[s]?)\b.*",
    "Total no. of sessions": r"(?i).*\btotal no\.? of sessions\b.*",
    "Session Duration": r"(?i).*\bsession duration\b.*",
    "Term": r"(?i).*\b(term|semester)\b.*",
    "Year and Batch": r"(?i).*\byear and batch\b.*",
    "Sections (if any)": r"(?i).*\bsections\b.*"
}

instructor_keys = {
    "Instructor(s)": r"(?i).*\binstructor\b.*",
    "Contact Details": r"(?i).*\bcontact\s*details\b.*",
    "Office": r"(?i).*\boffice\b.*",
    "Consultation Hours": r"(?i).*\bconsultation\s*hours\b.*"
}

assessment_col_patterns = {
    "Assessment Tool": r"(?i).*\bAssessment\b.*",
    "Percentage": r"(?i).*\bPercentage\b.*",
    "Description": r"(?i).*\bDescription\b.*"
}

session_plan_col_patterns = {
    "Session": r"(?i).*\bSession\b.*",
    "Module": r"(?i).*\bModule\b.*",
    "Topic": r"(?i).*\b(Topic|Readings)\b.*",
    "Chapter No / Reading material / Cases": r"(?i).*\b(Chapter No|Reading[s]?|Case)\b.*"
}

# A valid session pattern: one or more digits, optionally with a range.
valid_session_pattern = re.compile(r'^\d+([\s,-]\d+)?$')

In [None]:
intro_pattern = re.compile(
        r"(?m)^\s*Introduction\s*(?P<intro_text>.*?)(?=Learning Outcomes|Textbook|Pedagogy|Evaluation|Assessment)",
        re.DOTALL | re.IGNORECASE)

lo_pattern = re.compile(
    r"(?m)^\s*Learning Outcomes\s*(?:/Course Objectives)?\s*(?P<lo_text>.*?)(?=Textbook|Reference|Pedagogy|Evaluation|Assessment)",
    re.DOTALL | re.IGNORECASE)

ped_pattern = re.compile(
    r"(?m)^\s*Pedagogy Used\s*(?:/Learning Process)?\s*(?P<ped_text>.*?)(?=Evaluation|Assessment)",
    re.DOTALL | re.IGNORECASE)

# **DOCX Processor**

For Text Sections

In [None]:
def extract_text_sections_from_docx(file_path):
    """
    Extracts basic text sections ("Introduction", "Learning Outcomes", "Pedagogy")
    from a DOCX file.
    """
    document = Document(file_path)
    full_text = "\n".join([para.text for para in document.paragraphs])

    intro_match = intro_pattern.search(full_text)
    lo_match = lo_pattern.search(full_text)
    ped_match = ped_pattern.search(full_text)

    return {
        "Introduction": intro_match.group("intro_text").strip() if intro_match else "NA",
        "Learning Outcomes": lo_match.group("lo_text").strip() if lo_match else "NA",
        "Pedagogy": ped_match.group("ped_text").strip() if ped_match else "NA"
    }

For Tables

In [None]:
def is_merged_cell(cell):
    """
    Checks if a cell is a merged cell (i.e. a continuation cell)
    by looking at its underlying XML for a <w:vMerge> element.
    """
    xml = cell._tc.xml
    if "<w:vMerge" in xml:
        # If the cell indicates a restart, then it's the first cell (not a continuation)
        if 'w:val="restart"' in xml:
            return False
        else:
            return True
    return False

In [None]:
def get_cell_value(cell, prev_value=None):
    """
    Returns the cell's value. If the cell is blank and is a merged cell,
    it returns the previous row's value (if provided); otherwise "NA".
    Otherwise, returns the cleaned text.
    """
    text = cell.text.strip()
    if text == "":
        if is_merged_cell(cell):
            return prev_value if prev_value is not None else "NA"
        else:
            return "NA"
    else:
        return get_default(cell.text)

In [None]:
def extract_tables_from_docx(file_path):
    doc = Document(file_path)

    # Initialize dictionaries with default values "NA"
    course_metadata = {key: "NA" for key in course_metadata_keys}
    instructor_details = {key: "NA" for key in instructor_keys}
    assessment = []
    session_plan = []
    session_count_expected = None
    session_rows_extracted = 0

    # Iterate over each table in the document
    for table in doc.tables:
        if len(table.rows) == 0 or len(table.rows[0].cells) < 2:
            continue

        # Get header row texts (lower-case for metadata/instructor detection)
        header_lower = [cell.text.strip().lower() for cell in table.rows[0].cells]
        course_patterns = list(course_metadata_keys.values())
        instructor_patterns = list(instructor_keys.values())

        # If the header row indicates this is a metadata/instructor table
        if any(any(re.search(pattern, text, re.IGNORECASE) for pattern in course_patterns + instructor_patterns)
               for text in header_lower):
            prev_row_values = {}  # Reset for this table
            for row in table.rows:
                if len(row.cells) < 2:
                    continue
                # Process cells with merged-cell logic using get_cell_value
                cells = []
                for idx, cell in enumerate(row.cells):
                    cell_value = get_cell_value(cell, prev_row_values.get(idx))
                    cells.append(cell_value)
                # Update previous row values for non-"NA" cells
                prev_row_values = {i: cells[i] for i in range(len(cells)) if cells[i] != "NA"}

                # Use first two columns as key and value
                key_text = cells[0]
                value = cells[1] if len(cells) > 1 else "NA"

                # Check against course metadata keys (using regex, case-insensitive)
                for canonical, pattern in course_metadata_keys.items():
                    if re.search(pattern, key_text, re.IGNORECASE):
                        course_metadata[canonical] = value
                        if canonical == "Total no. of sessions":
                            try:
                                session_count_expected = int(value)
                            except ValueError:
                                session_count_expected = None
                        break

                # Check against instructor keys
                for canonical, pattern in instructor_keys.items():
                    if re.search(pattern, key_text, re.IGNORECASE):
                        instructor_details[canonical] = value
                        break
            continue  # Move to next table

        # Process Assessment table (Row-Based) using header mapping and merged cells logic
        assessment_header_mapping = {}
        prev_row_values = {}  # Reset for this table
        for i, cell in enumerate(table.rows[0].cells):
            header_text = cell.text.strip()
            for canonical, pattern in assessment_col_patterns.items():
                if re.search(pattern, header_text):
                    assessment_header_mapping[canonical] = i
                    break
        if len(assessment_header_mapping) > 0:
            for row in table.rows[1:]:
                cells = []
                for idx, cell in enumerate(row.cells):
                    cell_value = get_cell_value(cell, prev_row_values.get(idx))
                    cells.append(cell_value)
                for col_index in range(len(cells)):
                    if cells[col_index] is None and col_index in prev_row_values:
                        cells[col_index] = prev_row_values[col_index]
                prev_row_values = {i: cells[i] for i in range(len(cells)) if cells[i] != "NA"}
                # Ensure any still-missing cell is defaulted to "NA"
                cells = [cell if cell is not None else "NA" for cell in cells]
                row_data = {}
                for canonical in assessment_col_patterns.keys():
                    if canonical in assessment_header_mapping:
                        idx = assessment_header_mapping[canonical]
                        row_data[canonical] = cells[idx] if idx < len(cells) else "NA"
                    else:
                        row_data[canonical] = "NA"
                assessment.append(row_data)
            continue

        # Process Session Plan table (Row-Based) using header mapping and merged cells logic
        session_plan_header_mapping = {}
        prev_row_values = {}  # Reset for this table
        for i, cell in enumerate(table.rows[0].cells):
            header_text = cell.text.strip()
            for canonical, pattern in session_plan_col_patterns.items():
                if re.search(pattern, header_text):
                    session_plan_header_mapping[canonical] = i
                    break
        if len(session_plan_header_mapping) > 0:
            for row in table.rows[1:]:
                cells = []
                for idx, cell in enumerate(row.cells):
                    cell_value = get_cell_value(cell, prev_row_values.get(idx))
                    cells.append(cell_value)
                for col_index in range(len(cells)):
                    if cells[col_index] is None and col_index in prev_row_values:
                        cells[col_index] = prev_row_values[col_index]
                prev_row_values = {i: cells[i] for i in range(len(cells)) if cells[i] != "NA"}
                # Ensure any still-missing cell is defaulted to "NA"
                cells = [cell if cell is not None else "NA" for cell in cells]
                row_data = {}
                for canonical in session_plan_col_patterns.keys():
                    if canonical in session_plan_header_mapping:
                        idx = session_plan_header_mapping[canonical]
                        row_data[canonical] = cells[idx] if idx < len(cells) else "NA"
                    else:
                        row_data[canonical] = "NA"
                # For the "Session" column, if the value is "NA" or blank, inherit from the previous row
                if "Session" in row_data and (row_data["Session"] == "NA" or row_data["Session"].strip() == ""):
                    if session_plan:
                        row_data["Session"] = session_plan[-1].get("Session", "NA")
                session_plan.append(row_data)
                session_rows_extracted += 1
                if session_count_expected and session_rows_extracted >= session_count_expected:
                    break
            continue

    extracted_data = {
        "course_metadata": course_metadata,
        "instructor_details": instructor_details,
        "assessment": assessment,
        "session_plan": session_plan
    }

    return extracted_data

# **PDF Processor**

For Text Sections

In [None]:
def extract_text_sections_from_pdf(file_path):
    """
    Extracts basic text sections ("Introduction", "Learning Outcomes", "Pedagogy")
    from a PDF file.
    """
    full_text = ""
    with pdfplumber.open(file_path) as pdf:
        for page in pdf.pages:
            text = page.extract_text()
            if text:
                full_text += text + "\n"

    intro_match = intro_pattern.search(full_text)
    lo_match = lo_pattern.search(full_text)
    ped_match = ped_pattern.search(full_text)

    return {
        "Introduction": intro_match.group("intro_text").strip() if intro_match else "NA",
        "Learning Outcomes": lo_match.group("lo_text").strip() if lo_match else "NA",
        "Pedagogy": ped_match.group("ped_text").strip() if ped_match else "NA"
    }

For Tables

In [None]:
def extract_tables_from_pdf(file_path):
    extracted_data = {
        "course_metadata": {},
        "instructor_details": {},
        "assessment": [],
        "session_plan": []
    }
    session_count_expected = None
    session_rows_extracted = 0
    Session = ""

    # current_table_type will remember if we are in an assessment or Session table.
    # It can be set to "assessment", "Session", or None.
    current_table_type = None

    with pdfplumber.open(file_path) as pdf:
        for page in pdf.pages:
            tables = page.extract_tables()
            for table in tables:
                if not table or not table[0]:
                    continue
                header_row = table[0]

                # Determine table type using header matching.
                is_metadata = any(re.search(course_metadata_keys["Course Code and Course Title"], cell, re.IGNORECASE)
                                  for cell in header_row if cell)
                is_instructor = any(re.search(instructor_keys["Instructor(s)"], cell, re.IGNORECASE)
                                    for cell in header_row if cell)
                is_assessment = any(re.search(assessment_col_patterns["Assessment Tool"], cell, re.IGNORECASE)
                                    for cell in header_row if cell)
                is_session = any(re.search(session_plan_col_patterns["Session"], cell, re.IGNORECASE)
                                 for cell in header_row if cell) or (session_rows_extracted > 0)

                # --- Process Metadata/Instructor Table ---
                if is_metadata:
                    # Process metadata and (if present) instructor details.
                    for row in table:
                        if len(row) < 2:
                            continue
                        row = [get_default(cell) for cell in row]
                        key = row[0]
                        value = row[1]
                        for canonical, pattern in course_metadata_keys.items():
                            if re.search(pattern, key, re.IGNORECASE):
                                extracted_data["course_metadata"][canonical] = value
                                break
                        for canonical, pattern in instructor_keys.items():
                            if re.search(pattern, key, re.IGNORECASE):
                                extracted_data["instructor_details"][canonical] = value
                                break
                    session_count_expected = extracted_data["course_metadata"].get("Total no. of sessions", None)
                    current_table_type = None  # Reset when metadata table appears.

                # --- Process Standalone Instructor Table ---
                elif is_instructor:
                    for row in table:
                        row = [get_default(cell) for cell in row]
                        if len(row) < 2:
                            continue
                        key = row[0]
                        value = row[1]
                        for canonical, pattern in instructor_keys.items():
                            if re.search(pattern, key, re.IGNORECASE):
                                extracted_data["instructor_details"][canonical] = value
                                break
                    current_table_type = None

                # --- Process Assessment Table ---
                elif is_assessment and (current_table_type != "Session"):
                    current_table_type = "assessment"
                    # Determine if a header row exists by checking for expected assessment columns.
                    header_matches = sum(1
                                         for canonical, pattern in assessment_col_patterns.items()
                                         for cell in header_row
                                         if cell and re.search(pattern, cell, re.IGNORECASE))
                    # If at least two expected columns are present, assume the first row is a header.
                    start_idx = 1 if header_matches >= 2 else 0

                    prev_row_values = {}
                    for row in table[start_idx:]:
                        # Normalize cells using get_default.
                        row = [get_default(cell) for cell in row]

                        # For any cell that is "NA", fill it with the value from the previous row (if available).
                        for idx in range(len(row)):
                            if row[idx] == "NA" and idx in prev_row_values:
                                row[idx] = prev_row_values[idx]

                        # Update previous row values with the current row's non-"NA" cells.
                        prev_row_values = {i: row[i] for i in range(len(row)) if row[i] != "NA"}

                        # Ensure the row has at least three columns.
                        if len(row) < 3:
                            row += ["NA"] * (3 - len(row))

                        extracted_data["assessment"].append({
                            "Assessment Tool": row[0],
                            "Percentage": row[1],
                            "Description": row[2]
                        })


                # --- Process Session Plan Table ---
                elif is_session or (current_table_type == "Session"):
                    if ((re.search(rf'\b{session_count_expected}\b', Session))):
                        break
                    current_table_type = "Session"  # Once Session headers are seen, remain in Session mode.
                    # Decide if the first row is a header by matching expected Session plan columns.
                    header_matches = sum(1
                                         for canonical, pattern in session_plan_col_patterns.items()
                                         for cell in header_row
                                         if cell and re.search(pattern, cell, re.IGNORECASE))
                    start_idx = 1 if header_matches >= 2 else 0

                    for row in table[start_idx:]:
                        row = [get_default(cell) for cell in row]
                        # For merged cells, if any cell is "NA", do not auto-fill except for Session column.
                        # (i.e. we do not propagate "NA" values for other columns.)
                        if len(row) < 4:
                            row += ["NA"] * (4 - len(row))
                        # If the Session cell is blank, treat this row as a continuation of the previous Session.
                        if (row[0] == "NA" or row[0].strip() == "") and extracted_data["session_plan"]:
                            last_entry = extracted_data["session_plan"][-1]
                            for col, key in zip([1, 2, 3], ["Module", "Topic", "Chapter No / Reading material / Cases"]):
                                if row[col] != "NA":
                                    if last_entry[key] != "NA":
                                        last_entry[key] += " " + row[col]
                                    else:
                                        last_entry[key] = row[col]
                            continue
                        else:
                            new_entry = {
                                "Session": row[0],
                                "Module": row[1],
                                "Topic": row[2],
                                "Chapter No / Reading material / Cases": row[3]
                            }
                            extracted_data["session_plan"].append(new_entry)
                            Session = new_entry["Session"]
                            session_rows_extracted += 1
                else:
                    # redundant code used for edge cases
                    # If no header is detected, but we already have a current_table_type,
                    # treat this table as a continuation of that type.
                    if current_table_type == "assessment":
                        header_matches = sum(1
                                             for canonical, pattern in assessment_col_patterns.items()
                                             for cell in header_row
                                             if cell and re.search(pattern, cell, re.IGNORECASE))
                        start_idx = 1 if header_matches >= 2 else 0
                        prev_row_values = {}
                        for row in table[start_idx:]:
                            row = [get_default(cell) for cell in row]
                            for idx in range(len(row)):
                                if row[idx] == "NA" and idx in prev_row_values:
                                    row[idx] = prev_row_values[idx]
                            prev_row_values = {i: row[i] for i in range(len(row)) if row[i] != "NA"}
                            if len(row) < 3:
                                row += ["NA"] * (3 - len(row))
                            extracted_data["assessment"].append({
                                "Assessment Tool": row[0],
                                "Percentage": row[1],
                                "Description": row[2]
                            })

                    elif current_table_type == "Session":
                        if ((re.search(rf'\b{session_count_expected}\b', Session))):
                          break
                        header_matches = sum(1
                                             for canonical, pattern in session_plan_col_patterns.items()
                                             for cell in header_row
                                             if cell and re.search(pattern, cell, re.IGNORECASE))
                        start_idx = 1 if header_matches >= 2 else 0
                        for row in table[start_idx:]:
                            row = [get_default(cell) for cell in row]
                            if len(row) < 4:
                                row += ["NA"] * (4 - len(row))
                            if (row[0] == "NA" or row[0].strip() == "") and extracted_data["session_plan"]:
                                last_entry = extracted_data["session_plan"][-1]
                                for col, key in zip([1, 2, 3], ["Module", "Topic", "Chapter No / Reading material / Cases"]):
                                    if row[col] != "NA":
                                        if last_entry[key] != "NA":
                                            last_entry[key] += " " + row[col]
                                        else:
                                            last_entry[key] = row[col]
                                continue
                            else:
                                new_entry = {
                                    "Session": row[0],
                                    "Module": row[1],
                                    "Topic": row[2],
                                    "Chapter No / Reading material / Cases": row[3]
                                }
                                extracted_data["session_plan"].append(new_entry)
                                Session = new_entry["Session"]
                                session_rows_extracted += 1

    return extracted_data

# **Save to Output Folder**

In [None]:
def save_output(data, original_path, root_dir): #no subfolders
    # Define the Google Drive output folder path
    google_drive_folder = "/content/drive/MyDrive/Outputs"

    # Ensure the output directory exists
    Path(google_drive_folder).mkdir(parents=True, exist_ok=True)

    # Extract file name without extension
    file_name = os.path.basename(original_path)
    json_filename = f"{file_name}.json"

    # Full path for saving the JSON file in Google Drive
    output_path = os.path.join(google_drive_folder, json_filename)

    # Save JSON file
    with open(output_path, "w") as f:
        json.dump(data, f, indent=2)

    print(f"Saved output to: {output_path}")

# **Main**

In [None]:
def process_all_courses(root_dir):
    course_files = find_course_outlines(root_dir)

    for file_info in course_files:
        try:
            if file_info["filename"].lower().endswith(".docx"):
                basic_info = extract_text_sections_from_docx(file_info["path"])
                data = extract_tables_from_docx(file_info["path"])

            elif file_info["filename"].lower().endswith(".pdf"):
                basic_info = extract_text_sections_from_pdf(file_info["path"])
                data = extract_tables_from_pdf(file_info["path"])

            # Add basic text sections (Introduction, Learning Outcomes, Pedagogy)
            data["basic_info"] = basic_info
            # Add program/term info
            data["program"] = file_info["program"]
            data["term"] = file_info["term"]

            save_output(data, file_info["path"], root_dir)
            print(f"Processed: {file_info['path']}")

        except Exception as e:
            print(f"Error processing {file_info['path']}: {str(e)}")


if __name__ == "__main__":
    process_all_courses("/content/drive/MyDrive/root")

Saved output to: /content/drive/MyDrive/Outputs/CourseOutline_BusinessCommunication.pdf.json
Processed: /content/drive/MyDrive/root/MBA09/term_2/CourseOutline_BusinessCommunication.pdf
Saved output to: /content/drive/MyDrive/Outputs/CourseOutline_NLP.pdf.json
Processed: /content/drive/MyDrive/root/MSDSM/term_4/CourseOutline_NLP.pdf
Saved output to: /content/drive/MyDrive/Outputs/CourseOutline_StoryTellingwithData.docx.json
Processed: /content/drive/MyDrive/root/MSDSM/term_4/CourseOutline_StoryTellingwithData.docx


# **OWL Generation**

In [None]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def sanitize_iri(raw_input: str) -> str:
    """Sanitize strings for use in IRIs. Replaces spaces/special chars with underscores."""
    sanitized = re.sub(r'[^\w]', '_', raw_input.strip().lower())  # Replace non-alphanumerics with _
    sanitized = re.sub(r'_+', '_', sanitized)  # Collapse multiple underscores
    return sanitized.strip('_')  # Remove leading/trailing underscores

def create_ontology(input_folder, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Reset the ontology world to start fresh
    world = World()
    owlready2.default_world = world

    # Create a fresh ontology (do not load from file)
    onto = get_ontology("http://www.example.org/university.owl#")

    with onto:
        # Define Core Hierarchy Classes
        class Program(owlready2.Thing): pass
        class Term(owlready2.Thing): pass
        class Course(owlready2.Thing): pass

        # Define Course Component Classes
        class BasicInfo(owlready2.Thing): pass
        class CourseMetadata(owlready2.Thing): pass
        class InstructorDetails(owlready2.Thing): pass
        class Assessment(owlready2.Thing): pass
        class SessionPlan(owlready2.Thing): pass

        # Data Properties for Program and Term
        class programName(DataProperty, FunctionalProperty):
            domain = [Program]
            range = [str]

        class termNumber(DataProperty, FunctionalProperty):
            domain = [Term]
            range = [str]

        # BasicInfo Properties
        class Introduction(DataProperty, FunctionalProperty):
            domain = [BasicInfo]
            range = [str]

        class LearningOutcomes(DataProperty, FunctionalProperty):
            domain = [BasicInfo]
            range = [str]

        class PedagogyUsed(DataProperty, FunctionalProperty):
            domain = [BasicInfo]
            range = [str]

        # CourseMetadata Properties
        class CourseCodeTitle(DataProperty, FunctionalProperty):
            domain = [CourseMetadata]
            range = [str]

        class CourseType(DataProperty, FunctionalProperty):
            domain = [CourseMetadata]
            range = [str]

        class Prerequisites(DataProperty, FunctionalProperty):
            domain = [CourseMetadata]
            range = [str]

        class CourseCredit(DataProperty, FunctionalProperty):
            domain = [CourseMetadata]
            range = [str]

        class TotalSessions(DataProperty, FunctionalProperty):
            domain = [CourseMetadata]
            range = [str]

        class SessionDuration(DataProperty, FunctionalProperty):
            domain = [CourseMetadata]
            range = [str]

        class YearBatch(DataProperty, FunctionalProperty):
            domain = [CourseMetadata]
            range = [str]

        class Sections(DataProperty, FunctionalProperty):
            domain = [CourseMetadata]
            range = [str]

        # InstructorDetails Properties
        class Instructors(DataProperty, FunctionalProperty):
            domain = [InstructorDetails]
            range = [str]

        class ContactDetails(DataProperty, FunctionalProperty):
            domain = [InstructorDetails]
            range = [str]

        class Office(DataProperty, FunctionalProperty):
            domain = [InstructorDetails]
            range = [str]

        class ConsultationHours(DataProperty, FunctionalProperty):
            domain = [InstructorDetails]
            range = [str]

        # Assessment Properties
        class AssessmentTool(DataProperty, FunctionalProperty):
            domain = [Assessment]
            range = [str]

        class Percentage(DataProperty, FunctionalProperty):
            domain = [Assessment]
            range = [str]

        class AssessmentDescription(DataProperty, FunctionalProperty):
            domain = [Assessment]
            range = [str]

        # SessionPlan Properties
        class Session(DataProperty, FunctionalProperty):  # Renamed from SessionNumber for clarity
            domain = [SessionPlan]
            range = [str]

        class Module(DataProperty, FunctionalProperty):
            domain = [SessionPlan]
            range = [str]

        class Topic(DataProperty, FunctionalProperty):
            domain = [SessionPlan]
            range = [str]

        class ReadingMaterial(DataProperty, FunctionalProperty):
            domain = [SessionPlan]
            range = [str]

        # Object Properties with Bidirectional Links
        class hasTerm(ObjectProperty):
            domain = [Program]
            range = [Term]
        class belongsToProgram(ObjectProperty):
            domain = [Term]
            range = [Program]
            inverse_property = hasTerm

        class hasCourse(ObjectProperty):
            domain = [Term]
            range = [Course]
        class belongsToTerm(ObjectProperty):
            domain = [Course]
            range = [Term]
            inverse_property = hasCourse

        class hasBasicInfo(ObjectProperty, FunctionalProperty):
            domain = [Course]
            range = [BasicInfo]
        class infoOfCourse(ObjectProperty, FunctionalProperty):
            domain = [BasicInfo]
            range = [Course]
            inverse_property = hasBasicInfo

        class hasCourseMetadata(ObjectProperty, FunctionalProperty):
            domain = [Course]
            range = [CourseMetadata]
        class partOfCourse(ObjectProperty, FunctionalProperty):
            domain = [CourseMetadata]
            range = [Course]
            inverse_property = hasCourseMetadata

        class hasInstructorDetails(ObjectProperty, FunctionalProperty):
            domain = [Course]
            range = [InstructorDetails]
        class teachesCourse(ObjectProperty, FunctionalProperty):
            domain = [InstructorDetails]
            range = [Course]
            inverse_property = hasInstructorDetails

        class hasAssessment(ObjectProperty):
            domain = [Course]
            range = [Assessment]
        class assessmentOf(ObjectProperty):
            domain = [Assessment]
            range = [Course]
            inverse_property = hasAssessment

        class hasSessionPlan(ObjectProperty):
            domain = [Course]
            range = [SessionPlan]
        class sessionOf(ObjectProperty):
            domain = [SessionPlan]
            range = [Course]
            inverse_property = hasSessionPlan

        # ---- Clear any existing individuals (in case ontology already had some) ----
        for cls in onto.classes():
            for inst in list(cls.instances()):
                owlready2.destroy_entity(inst)

    # Process JSON files from input_folder
    processed_files = 0
    for filename in os.listdir(input_folder):
        if not filename.endswith(".json"):
            continue

        file_path = os.path.join(input_folder, filename)
        try:
            with open(file_path, "r", encoding="utf-8") as f:
                data = json.load(f)
        except Exception as e:
            logger.error(f"Error reading {filename}: {str(e)}")
            continue

        # Extract program and term (lowercase for consistency)
        program_name = sanitize_iri(data.get("program", ""))  # Uses helper
        term_name = sanitize_iri(data.get("term", ""))

        if not (program_name and term_name):
            logger.warning(f"Skipping {filename}: Missing program/term")
            continue

        # --- Program: Check for existing instance ---
        existing_program = onto.search_one(iri=f"*#{program_name}")
        if existing_program:
            program = existing_program
        else:
            program = onto.Program(program_name)
            program.programName = program_name

        # --- Term: Use composite ID to ensure uniqueness ---
        term_id = f"{program_name}_{term_name}"
        term = onto.Term(term_id)

        # Extract term number directly from the "term" field
        if "_" in term_name:
            term.termNumber = term_name.split('_')[-1]  # "term_1" → "1"
        else:
            term.termNumber = term_name  # Fallback for invalid formats
            logger.warning(f"Unstructured term name in {filename}: {term_name}")
        program.hasTerm.append(term)
        term.belongsToProgram.append(program)

        # --- Course: Check for existing Course by course code ---
        try:
            course_title = data["course_metadata"]["Course Code and Course Title"]
            #course_code = sanitize_iri(course_title.split()[0])  # Sanitize code
            course_code = sanitize_iri(course_title)
        except KeyError:
            logger.error(f"Missing course metadata in {filename}")
            continue

        existing_course = onto.search_one(iri=f"*#{course_code}")
        if existing_course:
            course = existing_course
        else:
            course = onto.Course(course_code)
            course.name = course_code

        # Link course to term if not already linked
        if course not in term.hasCourse:
            term.hasCourse.append(course)
        if term not in course.belongsToTerm:
            course.belongsToTerm.append(term)

        # --- Basic Info ---
        if 'basic_info' in data:
            if course.hasBasicInfo:
                bi = course.hasBasicInfo
            else:
                bi = onto.BasicInfo()
                course.hasBasicInfo = bi
            bi.Introduction = data['basic_info'].get('Introduction', '')
            bi.LearningOutcomes = data['basic_info'].get('Learning Outcomes', '')
            bi.PedagogyUsed = data['basic_info'].get('Pedagogy', '')
            bi.infoOfCourse = course

        # --- Course Metadata ---
        if 'course_metadata' in data:
            if course.hasCourseMetadata:
                cm = course.hasCourseMetadata
            else:
                cm = onto.CourseMetadata()
                course.hasCourseMetadata = cm
            cm_data = data['course_metadata']
            cm.CourseCodeTitle = cm_data.get('Course Code and Course Title', '')
            cm.CourseType = cm_data.get('Course type', '')
            cm.Prerequisites = cm_data.get('Pre-requisites (if any)', 'None')
            cm.CourseCredit = cm_data.get('Course Credit', '')
            try:
                cm.TotalSessions = cm_data.get('Total no. of sessions', 0)
            except ValueError:
                logger.warning(f"Invalid TotalSessions in {filename}")
                cm.TotalSessions = 0
            cm.SessionDuration = cm_data.get('Session Duration', '')
            cm.YearBatch = cm_data.get('Year and Batch', '')
            cm.Sections = cm_data.get('Sections (if any)', '')
            cm.partOfCourse = course

        # --- Instructor Details ---
        if 'instructor_details' in data:
            if course.hasInstructorDetails:
                instr = course.hasInstructorDetails
            else:
                instr = onto.InstructorDetails()
                course.hasInstructorDetails = instr
            instr_data = data['instructor_details']
            instr.Instructors = instr_data.get('Instructor(s)', '')
            instr.ContactDetails = instr_data.get('Contact Details', '')
            instr.Office = instr_data.get('Office', 'NA')
            instr.ConsultationHours = instr_data.get('Consultation Hours', 'NA')
            instr.teachesCourse = course

        # --- Assessments ---
        for assessment_data in data.get('assessment', []):
            # Check for duplicate assessment for this course
            duplicate_assessment = False
            for a in course.hasAssessment:
                if (a.AssessmentTool == assessment_data.get('Assessment Tool', '') and
                    a.Percentage == assessment_data.get('Percentage', '0.0') and
                    a.AssessmentDescription == assessment_data.get('Description', '')):
                    duplicate_assessment = True
                    break
            if duplicate_assessment:
                continue

            assessment = onto.Assessment()
            assessment.AssessmentTool = assessment_data.get('Assessment Tool', '')
            assessment.Percentage = assessment_data.get('Percentage', '0.0')
            assessment.AssessmentDescription = assessment_data.get('Description', '')
            course.hasAssessment.append(assessment)
            assessment.assessmentOf.append(course)

        # --- Session Plans ---
        for session_data in data.get('session_plan', []):
            # Check for duplicate session plan
            duplicate_session = False
            for sp in course.hasSessionPlan:
                if (sp.Session == session_data.get('Session', '') and
                    sp.Module == session_data.get('Module', '') and
                    sp.Topic == session_data.get('Topic', '') and
                    sp.ReadingMaterial == session_data.get('Chapter No / Reading material / Cases', '')):
                    duplicate_session = True
                    break
            if duplicate_session:
                continue

            sp = onto.SessionPlan()
            sp.Session = session_data.get('Session', '')
            sp.Module = session_data.get('Module', '')
            sp.Topic = session_data.get('Topic', '')
            sp.ReadingMaterial = session_data.get('Chapter No / Reading material / Cases', '')
            course.hasSessionPlan.append(sp)
            sp.sessionOf.append(course)

        processed_files += 1

    output_path = os.path.join(output_folder, "university_courses.owl")
    if os.path.exists(output_path):
        os.remove(output_path)

    onto.save(file=output_path, format="rdfxml")
    logger.info(f"Processed {processed_files} files. Ontology saved to {output_path}")

if __name__ == "__main__":
    input_dir = "/content/drive/MyDrive/Outputs"
    output_dir = "/content/drive/MyDrive/Ontology"
    create_ontology(input_dir, output_dir)