In [None]:
import json
import os
from typing import List

import pymupdf

from parser.elements.rectangle import Rectangle

In [None]:
from parser.elements.pymupdf_integ import load_exam_and_extract_text_words

load_exam_and_extract_text_words("../fe_files/exams/FE-Jan24.pdf")

In [None]:
import json  # noqa: F811
import re
from typing import Dict, Tuple  # noqa: F811

import pymupdf  # type: ignore # noqa: F811
import regex  # type: ignore

from parser.dataset.dataloader import DataLoader
from parser.dataset.exam import get_semester_and_year
from parser.elements.fill_in_the_blank import separate_underscores_from_text
from parser.elements.page import (
    ExamResult,
    MacroDefinition,
    PageBlocks,
    PageResult,
    Question,
    StructDefinition,
    extract_macro_definitions,
    extract_struct_definitions,
    is_code_page,
)
from parser.elements.pymupdf_integ import PyMuPDFRect, mark_rect, rect_to_bbox
from parser.elements.rectangle import merge_rectangles
from parser.model.page_model import PageType, SectionType, Semester
from parser.page_processing import (
    extract_date_from_page,
    get_page_type,
    get_section_type,
)
from parser.question_extraction import apply_header_filter


def remove_page_mentions(text: str) -> str:
    # Use regular expression to find and remove "Page x of y" patterns
    return re.sub(r"Page \d+ of \d+", "", text)


def load_exam_and_extract_text_words(
    filepath: str, write_files: bool = False
) -> ExamResult:
    doc: pymupdf.Document = pymupdf.open(filepath)

    semester: Semester | None = None
    year: int | None = None
    page_results: List[PageResult] = []

    previous_section_type: SectionType | None = None
    for i, page in enumerate(doc):  # scan through the pages
        blocks: List[PyMuPDFRect] = page.get_text("words")
        page_blocks = PageBlocks.from_page_bboxes(blocks, page)

        if previous_section_type is None:
            assert i == 0
            date = extract_date_from_page(page_blocks.merged_text)
            assert date is not None
            semester, year = get_semester_and_year(date)

        page_type = get_page_type(page_blocks.merged_text)
        if page_type is None:
            print(f"Breaking on page {i} because it is not a valid PageType")
            break

        section_type: SectionType | None = None
        if page_type == PageType.SECTION:
            section_type = get_section_type(page_blocks.merged_text)
            if section_type is None:
                print(f"Breaking on page {i} because it is not a valid SectionType")
                print(page_blocks.merged_text)
                break
        else:
            section_type = previous_section_type

        if section_type is None:
            raise ValueError("section_type is None")

        if page_type == PageType.SECTION:
            page_result = PageResult(
                page_number=i,
                section_type=section_type,
                page_type=page_type,
                raw_text=page_blocks.merged_text,
                filtered_text=remove_page_mentions(page_blocks.merged_text),
                code_areas=[],
                fill_in_the_blank_areas=[],
                struct_definitions=[],
                macro_definitions=[],
            )
            page_results.append(page_result)
            previous_section_type = section_type

            continue

        underscore_marking_queue: List[Rectangle] = []

        for block in page_blocks.blocks:
            rectangle = Rectangle.from_points(*block[0:4])
            rect_text = block[4]
            separated = separate_underscores_from_text(page, rectangle, rect_text)
            if separated.underscores is not None:
                underscore_marking_queue.append(separated.underscores)

        is_code, code_areas = is_code_page(page, page_blocks)
        print(f"is_code={is_code}")

        if is_code:
            assert code_areas is not None
            for code_area in code_areas:
                # assert len(code_area.sub_areas) > 0
                print(f"underscore_marking_queue={underscore_marking_queue}")

                mark_rect(page, code_area.rect, "entire_function_area")

                for underscore in underscore_marking_queue:
                    underscore_rect = Rectangle.from_points(*rect_to_bbox(underscore))
                    if underscore_rect.is_within(code_area.rect):
                        # remove the underscore from the queue
                        mark_rect(page, underscore_rect, "code_underscores")
                        print(f"Removed underscore from queue: {underscore}")
                        underscore_marking_queue.remove(underscore)

                for area in code_area.sub_areas:
                    mark_rect(page, area.code_textarea, "code_textarea")

            if write_files:
                with open(f"page-{i}-code_areas.json", "w") as code_file:
                    json.dump(
                        [code_area.model_dump() for code_area in code_areas],
                        code_file,
                        indent=4,
                    )

        for underscore in underscore_marking_queue:
            mark_rect(
                page,
                Rectangle.from_points(*rect_to_bbox(underscore)),
                "code_underscores",
            )

        struct_definitions: List[StructDefinition] = extract_struct_definitions(
            page, page_blocks
        )

        for struct_definition in struct_definitions:
            mark_rect(page, struct_definition.rect, "struct_definition")

        macro_definitions: List[MacroDefinition] = extract_macro_definitions(
            page, page_blocks
        )

        for macro_definition in macro_definitions:
            mark_rect(page, macro_definition.rect, "macro_definition")

        page_result = PageResult(
            page_number=i,
            section_type=section_type,
            page_type=page_type,
            code_areas=code_areas if code_areas is not None else [],
            filtered_text=remove_page_mentions(page_blocks.merged_text),
            fill_in_the_blank_areas=[
                Rectangle.from_points(*rect_to_bbox(x))
                for x in underscore_marking_queue
                if x is not None
            ],
            raw_text=page_blocks.merged_text,
            struct_definitions=struct_definitions,
            macro_definitions=macro_definitions,
        )

        previous_section_type = section_type
        page_results.append(page_result)

    questions: Dict[Tuple[SectionType, int], Question] = parse_into_questions(
        page_results, doc
    )

    def transform_questions(
        questions: Dict[Tuple[SectionType, int], Question],
    ) -> Dict[SectionType, Dict[int, Question]]:
        return {k[0]: {k[1]: v} for k, v in questions.items()}

    transformed_questions: Dict[SectionType, Dict[int, Question]] = transform_questions(
        questions
    )

    for transformed_question in transformed_questions.values():
        for question in transformed_question.values():
            for page_number, rect in question.rects.items():
                page = doc.load_page(page_number)
                print(f"Marking question on page {page_number}, rect: {rect}")
                mark_rect(page, rect, "question")

    if write_files:
        doc.save("marked-" + os.path.basename(filepath))

    return ExamResult(
        semester=semester,
        year=year,
        page_results=page_results,
        questions=transformed_questions,
    )


def extract_questions(text: str) -> List[Question]:
    questions: List[Question] = []
    pattern = re.compile(
        r"(?s)\s*([1-5])\)\s*\((\d+)\s*pts\)\s*([\w/ ]+)\s*\(\s*([^)]+?)\s*\)\s*(.*?)(?=\s*[1-5]\)\s*\(\d+\s*pts\)\s*[\w/ ]+\s*\([^)]+\)|\s*\Z)",
        re.DOTALL,
    )

    matches = pattern.findall(text)

    for match in matches:
        question_number, max_points, category, sub_category, question_text = match

        question = Question(
            rects={},
            raw=question_text,
            sub_questions=[],
            question_number=int(question_number),
        )

        questions.append(question)

    return questions


def parse_into_questions(
    page_results: List[PageResult], pymupdf_doc: pymupdf.Document
) -> Dict[Tuple[SectionType, int], Question]:
    pymupdf_pages = load_pymupdf_pages(pymupdf_doc, len(page_results))
    questions = {}

    for i, page_result in enumerate(page_results):
        if should_skip_page(page_result, i, page_results):
            continue

        current_page_text = page_result.filtered_text
        current_page_questions = extract_questions_from_page(current_page_text)
        next_page_text = get_next_page_text(i, page_results)
        current_plus_next_page_questions = extract_questions_from_combined_text(
            current_page_text, next_page_text
        )

        process_current_page_questions(questions, page_result, current_page_questions)
        process_combined_page_questions(
            questions, pymupdf_pages, i, page_result, current_plus_next_page_questions
        )

    return questions


def load_pymupdf_pages(
    pymupdf_doc: pymupdf.Document, num_pages: int
) -> List[pymupdf.Page]:
    return [pymupdf_doc.load_page(i) for i in range(num_pages)]


def should_skip_page(
    page_result: PageResult, i: int, page_results: List[PageResult]
) -> bool:
    if page_result.page_type == PageType.SECTION:
        print(f"Skipping page {i} because it is a section")
        return True
    if i + 1 < len(page_results) and page_results[i + 1].page_type == PageType.SECTION:
        print(f"Skipping page {i} because next page is a section")
        return True
    return False


def extract_questions_from_page(current_page_text: str) -> List[Question]:
    return extract_questions(apply_header_filter(current_page_text))


def get_next_page_text(i: int, page_results: List[PageResult]) -> str:
    return page_results[i + 1].filtered_text if i + 1 < len(page_results) else ""


def extract_questions_from_combined_text(
    current_page_text: str, next_page_text: str
) -> List[Question]:
    next_plus_current = apply_header_filter(current_page_text) + (
        "\n" + apply_header_filter(next_page_text) if next_page_text != "" else ""
    )
    return extract_questions(next_plus_current)


def process_current_page_questions(
    questions: Dict[Tuple[SectionType, int], Question],
    page_result: PageResult,
    current_page_questions: List[Question],
):
    for question in current_page_questions:
        questions[(page_result.section_type, question.question_number)] = question


def process_combined_page_questions(
    questions: Dict[Tuple[SectionType, int], Question],
    pymupdf_pages: List[pymupdf.Page],
    i: int,
    page_result: PageResult,
    current_plus_next_page_questions: List[Question],
):
    for question in current_plus_next_page_questions:
        key = (page_result.section_type, question.question_number)
        if key in questions:
            existing_question = questions[key]
            if existing_question.raw != question.raw:
                update_question_for_multi_page(
                    questions, pymupdf_pages, i, key, existing_question, question
                )
            else:
                update_question_for_single_page(
                    questions, pymupdf_pages, i, key, existing_question
                )


def update_question_for_multi_page(
    questions: Dict[Tuple[SectionType, int], Question],
    pymupdf_pages: List[pymupdf.Page],
    i: int,
    key: Tuple[SectionType, int],
    existing_question: Question,
    question: Question,
):
    current_pymupdf_page = pymupdf_pages[i]
    current_page_rect = find_rectangles(current_pymupdf_page, existing_question.raw)
    text_difference = question.raw[len(existing_question.raw) :]
    next_page = pymupdf_pages[i + 1]
    next_page_rect = find_rectangles(next_page, text_difference)
    questions[key].raw = question.raw
    questions[key].rects = {i: current_page_rect, i + 1: next_page_rect}


def update_question_for_single_page(
    questions: Dict[Tuple[SectionType, int], Question],
    pymupdf_pages: List[pymupdf.Page],
    i: int,
    key: Tuple[SectionType, int],
    existing_question: Question,
):
    current_pymupdf_page = pymupdf_pages[i]
    current_page_rect = find_rectangles_with_fallback(
        current_pymupdf_page, existing_question.raw
    )
    questions[key].rects = {i: current_page_rect}


def find_rectangles(page: pymupdf.Page, text: str) -> Rectangle:
    return merge_rectangles(
        [Rectangle.from_points(*x[0:4]) for x in page.search_for(text)]
    )


def find_rectangles_with_fallback(page: pymupdf.Page, text: str) -> Rectangle:
    try:
        return find_rectangles(page, text)
    except Exception:
        print(f"Error finding rectangles for text: {text}")
        print("Attempting alternative approach")
        first_line = text.split("\n")[0].strip()
        last_line = [line for line in text.split("\n") if line.strip()][-1].strip()
        try:
            return merge_rectangles(
                [Rectangle.from_points(*x[0:4]) for x in page.search_for(first_line)]
                + [Rectangle.from_points(*x[0:4]) for x in page.search_for(last_line)]
            )
        except Exception as e:
            print(f"Error finding rectangles with fallback: {e}")
            raise

In [None]:
LOAD_ALL = True


def serialize_questions(questions: Dict[int, Question], filepath: str):
    out_dir = os.path.join("test_data", os.path.basename(filepath.replace(".pdf", "")))
    os.makedirs("test_data", exist_ok=True)
    with open(f"{out_dir}-questions.json", "w") as questions_file:
        json.dump(questions, questions_file, indent=4)


if LOAD_ALL:
    loader = DataLoader("../fe_files/exams/", "../fe_files/solutions/")
    exam_paths = loader.get_exam_paths()

    for exam_path in exam_paths:
        print(f"Processing {exam_path}")
        exam_results = load_exam_and_extract_text_words(exam_path, write_files=True)

        # pymupdf_doc = pymupdf.open(exam_path)
        # questions: Dict[Tuple[SectionType, int], Question] = parse_into_questions(
        #    page_results, pymupdf_doc
        # )

        # serialize_questions(exam_results)
        # serialize_questions(
        #    {f"{k[0]}-{k[1]}": v.model_dump() for k, v in questions.items()}, exam_path
        # )

else:
    load_exam_and_extract_text_words("../fe_files/exams/FE-Jan20.pdf", write_files=True)

In [None]:
from typing import Dict

from parser.elements.code import CodeArea


def get_all_exam_results() -> Dict[str, ExamResult]:
    loader = DataLoader("../fe_files/exams/", "../fe_files/solutions/")
    exam_paths = loader.get_exam_paths()

    exam_results: Dict[str, ExamResult] = {}
    for exam_path in exam_paths:
        exam_results[exam_path] = load_exam_and_extract_text_words(
            exam_path, write_files=True
        )

    return exam_results


def serialize_all_code_areas(
    exam_results: Dict[str, ExamResult], exam_path: str
) -> None:
    exam_result: ExamResult = exam_results[exam_path]

    code_areas: Dict[int, List[CodeArea]] = {}
    for page_result in exam_result.page_results:
        code_areas[page_result.page_number] = page_result.code_areas

    out_dir = os.path.join("test_data", os.path.basename(exam_path.replace(".pdf", "")))
    os.makedirs("test_data", exist_ok=True)
    with open(f"{out_dir}-code_areas.json", "w") as code_file:
        json.dump(
            {k: [x.model_dump() for x in v] for k, v in code_areas.items()},
            code_file,
            indent=4,
        )

    with open(f"{out_dir}-raw.json", "w") as code_file:
        json.dump(
            {
                v.page_number: {
                    "raw": v.raw_text,
                    "split": v.raw_text.split("\n"),
                    "struct_definitions": [
                        x.model_dump() for x in v.struct_definitions
                    ],
                    "code_areas": [x.model_dump() for x in v.code_areas],
                    "macro_definitions": [x.model_dump() for x in v.macro_definitions],
                }
                for v in exam_result.page_results
            },
            code_file,
            indent=4,
        )


exam_results = get_all_exam_results()
for exam_path in exam_results.keys():
    serialize_all_code_areas(exam_results, exam_path)

In [None]:
pattern = regex.compile(
    r"""
        \{                          # Opening brace
        (?:
            \s*                     # Optional leading whitespace
            (?:
                //.*(?:\n|$)        | # Single-line comment
                for\s*\([^)]*\)\s*\{\s*\}\s*(?:\n|$)  | # Empty for loop
                while\s*\([^)]*\)\s*\{\s*\}\s*(?:\n|$) | # Empty while loop
            )
        )*                           # Repeat for multiple lines
        \}                          # Closing brace
        """,
    regex.VERBOSE | regex.MULTILINE,
)

inputs = [
    "{ //complete this function \n}",
    "{ //complete this function \n\n}",
    "{ //complete this function \n\n\n}",
    "{ //complete this function \n\n\n\n}",
    "{ }",
    "int kClosePerm(int* perm, int* used, int n, int maxgap, int k) {\nif (n == k)  \n ______________________ ; \n int res = 0; \n for (int i=0; i<n; i++) { \n} \n return res; \n}",  # should match ONLY the text within the empty for loop
]

for input in inputs:
    matches = pattern.finditer(input)
    print(f"matches={matches}")
    for match in matches:
        print(match)

In [None]:
inputs = [
    """struct tv_show * delete_show_list (struct tv_show *show_list, int length) { 
} 
Spring 2020 
Data Structures Exam, Part A 
 
Page 3 of 4""",
    """node *deleteMe(node* head, node* me) { \n}""",
]


my_pattern = regex.compile(
    r"""
    ^\s*                                   # Ensure the function definition starts at a line (after optional indentation)
    (?P<return_type>
        [A-Za-z_*\&\[\]]+                   # The first part of the return type (e.g., int, void, etc.), excluding digits
        (?:
            (?:\s+|/\*.*?\*/|//.*?$)+        # Allow whitespace or comments between return type parts
            [A-Za-z_*\&\[\]]+               # Additional part of the return type (e.g., static, inline)
        )*
    )
    \s*                                    # Some whitespace after the return type before the function name
    (?P<function_name>\w+)                 # The function name
    \s*                                    # Optional whitespace before the arguments
    \(
        (?P<arguments>
            (?:[^()]*|\((?:[^()]|\([^()]*\))*\))*
        )
    \)\s*
    (?P<body>
        \{
            (?:
                [^{}]+                     # Non-brace characters
                |
                (?&body)                   # Recursively match nested braces
            )*
        \}
    )
    """,
    regex.MULTILINE | regex.DOTALL | regex.VERBOSE,
)

for input in inputs:
    matches = my_pattern.finditer(input)
    print(f"matches={matches}")
    for match in matches:
        print(match)

In [None]:
pattern = re.compile(
    r"(?s)\s*([1-5])\)\s*\((\d+)\s*pts\)\s*([\w/ ]+)\s*\(\s*([^)]+?)\s*\)\s*(.*?)(?=\s*[1-5]\)\s*\(\d+\s*pts\)\s*[\w/ ]+\s*\([^)]+\)|\s*\Z)",
    re.DOTALL,
)

inputs = [
    "2) (10 pts) ALG/DSN (Sorting) \n \n(a) (5 pts) Consider running a Bubble Sort on the array shown below. How many swaps will execute for\nthe duration of the algorithm running on the array shown below? Explain how you got your answer. \n97 \n16 \n45 \n63 \n13 \n22 \n7 \n58 \n72 \nReasoning: \nNumber of Swaps: _________ \n(b) (5 pts) List the best case run time of each of the following sorting algorithms, in terms of n, the \nnumber of items being sorted. Assume all items being sorted are distinct. \n(i) Insertion Sort \n \n \n \n____________ \n(ii) Selection Sort \n \n \n \n____________ \n(iii) Heap Sort  \n \n \n \n____________ \n(iv) Merge Sort \n \n \n \n____________ \n(v) Quick Sort  \n \n \n \n____________ \n"
]

for input in inputs:
    matches = pattern.findall(input)
    print(f"matches={matches}")
    for match in matches:
        print(match)