<a href="https://colab.research.google.com/github/armelida/MELIDA/blob/main/src/questions-scraper/MELIDA_PDF_Scraper_V1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
# Install required packages
!pip install pdfplumber pandas --quiet

In [7]:
# Install required packages
!pip install pdfplumber pandas --quiet

In [8]:
# Configure paths
import os

# Update this path based on your Drive structure
BASE_PATH = "/content/drive/MyDrive/MELIDA"  # Adjust this path as needed

# Define specific paths
PDF_DIR = os.path.join(BASE_PATH, "data/raw/exams")
OUTPUT_DIR = os.path.join(BASE_PATH, "data/questions")

# Create directories if they don't exist
os.makedirs(PDF_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"PDF Directory: {PDF_DIR}")
print(f"Output Directory: {OUTPUT_DIR}")

PDF Directory: /content/drive/MyDrive/MELIDA/data/raw/exams
Output Directory: /content/drive/MyDrive/MELIDA/data/questions


In [9]:
import re
import json
import pdfplumber
import pandas as pd
from typing import Dict, List, Tuple, Optional


def get_exam_metadata(filename: str) -> Tuple[str, str]:
    """
    Extracts exam type and year from filename
    Example: 'Cuaderno_2024_MEDICINA_0_C.pdf' -> ('MIR', '2024')
    Adjust patterns as needed for different exam types
    """
    # Default values
    exam_type = "MIR"
    year = "UNKNOWN"

    # Extract year
    year_match = re.search(r'(\d{4})', filename)
    if year_match:
        year = year_match.group(1)

    # Extract exam type if present (customize based on your naming conventions)
    if "MEDICINA" in filename.upper():
        exam_type = "MIR"
    elif "ENFERMERIA" in filename.upper():
        exam_type = "EIR"
    # Add more exam types as needed

    return exam_type, year


def format_question_id(exam_type: str, year: str, version: str = "v01",
                       question_type: str = "t01", question_num: int = 0) -> str:
    """
    Creates standardized question ID
    Format: {exam_type}-{year}-{version}-{question_type}-Q{question_num:03d}
    Example: MIR-2024-v01-t01-Q026
    """
    return f"{exam_type}-{year}-{version}-{question_type}-Q{question_num:03d}"


def create_formatted_question(exam_type: str, year: str, qnum: int,
                             qtext: str, options: List[str],
                             source_file: str, page_num: int) -> Dict:
    """
    Creates a question dict with the required format
    """
    # Clean the question text
    text = re.sub(r'\s+', ' ', qtext).strip()
    text = re.sub(r'(\w)-\s+(\w)', r'\1\2', text)  # Merge hyphen-split
    text = re.sub(r'\[(.*?)\]', r'\1', text)       # Remove bracket formatting

    # Format options as required dictionary with A, B, C, D keys
    option_dict = {}
    option_keys = ["A", "B", "C", "D"]

    for i, opt in enumerate(options):
        if i < len(option_keys):
            opt = re.sub(r'\s+', ' ', opt).strip()
            option_dict[option_keys[i]] = opt

    # Ensure all options exist even if empty
    for key in option_keys:
        if key not in option_dict:
            option_dict[key] = ""

    # Create the question object in the required format
    return {
        "id": format_question_id(exam_type, year, question_num=qnum),
        "question_text": text,
        "options": option_dict,
        # Metadata fields (not in final output but useful for debugging)
        "_metadata": {
            "source_file": source_file,
            "page_number": page_num,
            "original_number": qnum
        }
    }


def extract_lines_with_x0(page, bbox):
    """
    Crops to bbox, extracts text lines as (text, x0).
    Merges lines ending in '-'.
    """
    lines = []
    hyphen_buffer = ""

    cropped = page.crop(bbox)
    raw_lines = cropped.extract_text_lines()

    for ln in raw_lines:
        text = ln["text"].strip()
        x0_val = ln["x0"]

        if hyphen_buffer:
            text = hyphen_buffer + text
            hyphen_buffer = ""

        if re.search(r'-\s*$', text):
            hyphen_buffer = re.sub(r'-\s*$', '', text)
            continue

        lines.append((text, x0_val))
    return lines


def split_markers(line_text):
    """
    Splits on every '(\\d+)\\. ' pattern.
    Returns [(num_str_or_None, snippet)].
    """
    pattern = re.compile(r'(\d+)\.\s')
    tokens = pattern.split(line_text)

    results = []
    leftover = tokens[0].strip()
    if leftover:
        results.append((None, leftover))

    i = 1
    while i < len(tokens):
        num_str = tokens[i]
        i += 1
        snippet = tokens[i].strip() if i < len(tokens) else ""
        i += 1
        results.append((num_str, snippet))
    return results


def parse_lines(
    lines, page_idx, exam_type, year, source_file,
    questions,
    current_qnum, current_qtext, current_opts
):
    """
    State machine logic to extract questions and options
    """
    MAX_QNUM = 210

    for (full_line, _x0) in lines:
        if not full_line.strip():
            continue

        segments = split_markers(full_line)

        for (num_str, snippet) in segments:
            snippet = snippet.strip()

            if num_str is not None:
                val = int(num_str)

                # CASE A: Potential option if 1..4
                if 1 <= val <= 4:
                    # If we have a current question and fewer than 4 options, treat as new option
                    if current_qnum is not None and len(current_opts) < 4:
                        current_opts.append(snippet)
                    else:
                        # Otherwise, this is a new question
                        # finalize old question if any
                        if current_qnum is not None:
                            questions.append(create_formatted_question(
                                exam_type,
                                year,
                                current_qnum,
                                current_qtext,
                                current_opts,
                                source_file,
                                page_idx + 1
                            ))
                        current_qnum = val
                        current_qtext = snippet
                        current_opts = []

                # CASE B: question # in [5..210] or we have 4 options already
                else:
                    if val <= MAX_QNUM:
                        # finalize old question if any
                        if current_qnum is not None:
                            questions.append(create_formatted_question(
                                exam_type,
                                year,
                                current_qnum,
                                current_qtext,
                                current_opts,
                                source_file,
                                page_idx + 1
                            ))
                        # Start new question
                        current_qnum = val
                        current_qtext = snippet
                        current_opts = []
                    else:
                        # out of range => just leftover text
                        if current_opts:
                            current_opts[-1] += f" {val}. {snippet}"
                        elif current_qnum is not None:
                            current_qtext += f" {val}. {snippet}"

            else:
                # leftover text
                if current_qnum is not None and current_opts:
                    # append to last option
                    current_opts[-1] += " " + snippet
                elif current_qnum is not None:
                    # append to question text
                    current_qtext += " " + snippet
                # else we have no active question => ignore

    return current_qnum, current_qtext, current_opts


def process_pdf(pdf_path: str, start_page: int = 2) -> List[Dict]:
    """
    Process PDF and extract questions with options
    """
    questions = []
    current_qnum = None
    current_qtext = ""
    current_opts = []

    # Extract metadata from filename
    filename = os.path.basename(pdf_path)
    exam_type, year = get_exam_metadata(filename)

    with pdfplumber.open(pdf_path) as pdf:
        source_file = os.path.basename(pdf_path)
        num_pages = len(pdf.pages)

        for i in range(start_page, num_pages):
            page = pdf.pages[i]
            w, h = page.width, page.height
            mid_x = w / 2.0

            # Split page into left and right columns
            left_bbox = (0, 0, mid_x, h)
            right_bbox = (mid_x, 0, w, h)

            left_lines = extract_lines_with_x0(page, left_bbox)
            right_lines = extract_lines_with_x0(page, right_bbox)

            # Parse left column
            current_qnum, current_qtext, current_opts = parse_lines(
                left_lines, i, exam_type, year, source_file,
                questions,
                current_qnum, current_qtext, current_opts
            )

            # Parse right column
            current_qnum, current_qtext, current_opts = parse_lines(
                right_lines, i, exam_type, year, source_file,
                questions,
                current_qnum, current_qtext, current_opts
            )

    # Add the last question if not already added
    if current_qnum is not None:
        questions.append(create_formatted_question(
            exam_type,
            year,
            current_qnum,
            current_qtext,
            current_opts,
            source_file,
            i + 1
        ))

    return questions


def clean_output_for_export(questions: List[Dict]) -> List[Dict]:
    """
    Prepare questions for export by removing metadata fields
    """
    cleaned = []
    for q in questions:
        # Create a copy without the metadata
        cleaned_q = {k: v for k, v in q.items() if not k.startswith('_')}
        cleaned.append(cleaned_q)
    return cleaned


def export_questions(questions: List[Dict], output_dir: str,
                    exam_type: str = "MIR", year: str = "2024",
                    version: str = "v01", question_type: str = "t01") -> Tuple[str, str]:
    """
    Export questions to CSV and JSON files with standardized naming
    """
    if not questions:
        return None, None

    # Create the output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Format the filename according to requirements
    filename_base = f"{exam_type}-{year}-{version}-{question_type}"

    # Clean the questions for export (remove metadata)
    export_questions = clean_output_for_export(questions)

    # Export to JSON
    json_path = os.path.join(output_dir, f"{filename_base}.json")
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump(export_questions, f, ensure_ascii=False, indent=2)

    # Export to CSV (flattening the options dictionary)
    df = pd.DataFrame(questions)

    # Extract options from the nested dictionary for CSV format
    if not df.empty and 'options' in df.columns:
        for key in ['A', 'B', 'C', 'D']:
            df[f'option_{key}'] = df['options'].apply(lambda x: x.get(key, ''))
        df = df.drop(columns=['options'])

    # Remove metadata columns
    if '_metadata' in df.columns:
        metadata_df = pd.json_normalize(df['_metadata'])
        df = pd.concat([df.drop(columns=['_metadata']), metadata_df], axis=1)

    # Export to CSV
    csv_path = os.path.join(output_dir, f"{filename_base}.csv")
    df.to_csv(csv_path, index=False, encoding='utf-8-sig')

    return json_path, csv_path


def process_directory(pdf_dir: str, output_dir: str, start_page: int = 2) -> None:
    """
    Process all PDFs in a directory and export questions
    """
    all_questions = []
    exam_types = set()
    years = set()

    # Process all PDFs
    for filename in sorted(os.listdir(pdf_dir)):
        if not filename.lower().endswith(".pdf"):
            continue

        pdf_path = os.path.join(pdf_dir, filename)
        exam_type, year = get_exam_metadata(filename)
        exam_types.add(exam_type)
        years.add(year)

        try:
            questions = process_pdf(pdf_path, start_page=start_page)
            all_questions.extend(questions)
            print(f"Extracted {len(questions)} questions from {filename} ({exam_type} {year})")
        except Exception as e:
            print(f"Error processing {filename}: {str(e)}")

    # Group questions by exam type and year
    grouped_questions = {}
    for q in all_questions:
        # Extract exam type and year from question ID
        id_parts = q['id'].split('-')
        if len(id_parts) >= 2:
            exam_type, year = id_parts[0], id_parts[1]
            key = f"{exam_type}-{year}"

            if key not in grouped_questions:
                grouped_questions[key] = []

            grouped_questions[key].append(q)

    # Export each group separately
    for key, questions in grouped_questions.items():
        exam_type, year = key.split('-')

        # Export with standard naming
        json_path, csv_path = export_questions(
            questions,
            output_dir,
            exam_type=exam_type,
            year=year
        )

        if json_path:
            print(f"Exported {len(questions)} {exam_type} {year} questions to:")
            print(f"  - JSON: {os.path.basename(json_path)}")
            print(f"  - CSV: {os.path.basename(csv_path)}")

In [10]:
# Option to upload PDFs directly to Colab (if not already in Drive)
from google.colab import files
import shutil

def upload_pdfs_to_drive():
    uploaded = files.upload()

    for filename, content in uploaded.items():
        if filename.lower().endswith('.pdf'):
            dest_path = os.path.join(PDF_DIR, filename)
            with open(dest_path, 'wb') as f:
                f.write(content)
            print(f"Saved {filename} to {dest_path}")

# Uncomment the line below to upload PDF files
# upload_pdfs_to_drive()

In [11]:
# List PDF files available for processing
pdf_files = [f for f in os.listdir(PDF_DIR) if f.lower().endswith('.pdf')]
print(f"Found {len(pdf_files)} PDF files in {PDF_DIR}:")
for i, file in enumerate(pdf_files, 1):
    print(f"{i}. {file}")

Found 0 PDF files in /content/drive/MyDrive/MELIDA/data/raw/exams:


In [12]:
# Process all PDFs in the directory
process_directory(PDF_DIR, OUTPUT_DIR, start_page=2)

In [13]:
# List processed output files
json_files = [f for f in os.listdir(OUTPUT_DIR) if f.lower().endswith('.json')]
print(f"Found {len(json_files)} processed JSON files in {OUTPUT_DIR}:")
for i, file in enumerate(json_files, 1):
    print(f"{i}. {file}")

Found 0 processed JSON files in /content/drive/MyDrive/MELIDA/data/questions:


In [14]:
# Preview a sample from the first JSON file
if json_files:
    sample_file = os.path.join(OUTPUT_DIR, json_files[0])
    with open(sample_file, 'r', encoding='utf-8') as f:
        data = json.load(f)

    print(f"Sample file: {json_files[0]}")
    print(f"Total questions: {len(data)}")
    print("\nSample questions (first 2):")

    for i, q in enumerate(data[:2], 1):
        print(f"\nQuestion {i}:")
        print(f"ID: {q['id']}")
        print(f"Text: {q['question_text']}")
        print("Options:")
        for key, value in q['options'].items():
            print(f"  {key}: {value}")

In [15]:
# Download a specific file
def download_file(file_path):
    try:
        files.download(file_path)
        print(f"Started download of {os.path.basename(file_path)}")
    except Exception as e:
        print(f"Error downloading file: {str(e)}")

# Example usage - uncomment to download a specific file
# if json_files:
#     download_file(os.path.join(OUTPUT_DIR, json_files[0]))

# Create a zip file with all processed files for easier download
import zipfile

def create_and_download_zip():
    zip_path = os.path.join(BASE_PATH, "processed_questions.zip")

    with zipfile.ZipFile(zip_path, 'w') as zipf:
        for file in os.listdir(OUTPUT_DIR):
            file_path = os.path.join(OUTPUT_DIR, file)
            if os.path.isfile(file_path):
                zipf.write(file_path, arcname=file)

    download_file(zip_path)
    print(f"All processed files zipped to {zip_path}")

# Uncomment to create and download a zip of all processed files
# create_and_download_zip()