# Legal Document Preprocessing Pipeline

This notebook implements a comprehensive preprocessing pipeline for legal documents, specifically designed for processing Malaysian legal cases. The pipeline handles various text extraction methods, metadata extraction, and document preparation for RAG (Retrieval-Augmented Generation) systems.

## Pipeline Overview

The preprocessing pipeline consists of several key components:

1. **Text Extraction**: Multi-modal approach using PyMuPDF4LLM and OCR fallback
2. **Metadata Extraction**: Regex-based extraction of legal metadata (case numbers, courts, dates, etc.)
3. **Document Processing**: Batch processing with memory optimization and error handling
4. **Dataset Creation**: Automated creation of training and test datasets
5. **LLM-Based Analysis**: Advanced metadata extraction using Google Gemini AI

## Key Features

- **Hybrid Text Extraction**: Combines direct PDF text extraction with OCR for scanned documents
- **Memory Optimization**: Single GPU OCR instance with thread-safe processing
- **Robust Error Handling**: Graceful degradation and comprehensive error reporting  
- **Parallel Processing**: Multi-threaded processing for improved performance
- **Metadata Integration**: Seamless integration of extracted metadata with document content
- **Quality Validation**: AI-powered metadata validation and enhancement

---

In [None]:
# 📦 Import Required Libraries
# This section imports all necessary dependencies for the legal document preprocessing pipeline

# Core Python libraries
import re
import os
import json
import numpy as np
from datetime import datetime
from typing import List, Dict, Optional
from collections import OrderedDict

# PDF processing libraries
import pymupdf              # Core PDF manipulation
import pymupdf4llm          # LlamaIndex integration for PDF processing
from pymupdf4llm import to_markdown

# Structured data and validation
from pydantic import BaseModel, Field
from llama_index.core.output_parsers import PydanticOutputParser

# OCR capabilities for scanned documents
import easyocr
from pdf2image import convert_from_path

# Parallel processing and threading
import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import multiprocessing
import time
import gc
import psutil

# Image processing
from PIL import Image        

# Document handling for RAG pipeline
from llama_index.core import Document

## Directory Setup and Data Organization

This section configures the directory structure for processing legal documents. The pipeline is designed to handle large collections of legal cases and legislation documents from structured directory hierarchies.

In [None]:
# 📁 Base directories for legal document collections
legal_case_dir = "../../../../data/raw/legal_cases"     # Contains 16 folders with ~100 PDF files each
legislation_dir = "../../../../data/raw/legislation"    # Contains various categories of legal acts and regulations

In [7]:
import os
os.listdir(legal_case_dir)

['legal_case_folder1',
 'legal_case_folder10',
 'legal_case_folder11',
 'legal_case_folder12',
 'legal_case_folder13',
 'legal_case_folder14',
 'legal_case_folder15',
 'legal_case_folder16',
 'legal_case_folder2',
 'legal_case_folder3',
 'legal_case_folder4',
 'legal_case_folder5',
 'legal_case_folder6',
 'legal_case_folder7',
 'legal_case_folder8',
 'legal_case_folder9',
 'rag_legal_case_files',
 'test_files']

In [8]:
os.listdir(legislation_dir)

['act', 'federal_constitution', 'ordinance', 'subsidiary_legislation']

## Regex-Based Metadata Extraction

This section implements sophisticated regex patterns to extract structured metadata from legal documents. The extraction targets key legal information including case numbers, court details, decision dates, parties involved, and legal references.

### Extracted Metadata Fields:
- **Case Number**: Various formats (Guaman, Rayuan Sivil, Civil Suit, etc.)
- **Court Information**: Malaysian and English court formats
- **Decision Dates**: Multiple date formats and languages
- **Parties**: Petitioners, respondents, and legal representation
- **Legal References**: Related cases, legislation, and other sources
- **Appeal Information**: Appeal type classification

The regex patterns are designed to handle both Bahasa Malaysia and English legal documents, accommodating various formatting conventions used in Malaysian courts.

In [18]:
def extract_metadata_using_regex(page_content):
    metadata = {}

    # Case Number
    case_patterns = [
        r"GUAMAN\s*NO\s*:?\s*([\w\(\)\-\/\s]+)",  
        r"RAYUAN SIVIL NO\.?\s*:?\s*([\w\(\)\-\/\s]+)",  
        r"GUAMAN SIVIL NO\.?\s*:?\s*([\w\(\)\-\/\s]+)",  
        r"CIVIL SUIT NO\.?\s*:?\s*([\w\(\)\-\/\s]+)",
        r"SUIT NO\.?\s*:?\s*([\w\(\)\-\/\s]+)",                  
        r"CIVIL APPEAL NO\.?\s*:?\s*([\w\(\)\-\/\s]+)",
        r"APPEAL NO\.?\s*:?\s*([\w\(\)\-\/\s]+)",
        r"NO\.?\s*:?\s*([\w\(\)\-\/\s]+)",
    ]

    for pattern in case_patterns:
        match = re.search(pattern, page_content, re.IGNORECASE)
        if match:
            metadata["case_number"] = match.group(1).strip()
            break

    # Court
    court_patterns = [
        # Malaysian court format - capture court type before location
        r"DALAM\s+(MAHKAMAH\s+[A-Z\s]+?)(?:\s+DI\s+[A-Z\s]+|\s+DALAM\s+NEGERI|\n|$)",
        # English format - specific court keywords only  
        r"IN THE\s+((?:FEDERAL COURT|HIGH COURT|COURT OF APPEAL|SESSIONS COURT|MAGISTRATES?[\'\s]*COURT)[^\n]*?)(?:\s+AT\s+[A-Z\s]+)?(?:\n|$)",
        # Alternative English format
        r"IN THE\s+(.*?COURT.*?)(?:\s+AT\s+|\n|$)",
    ]
    
    metadata["court"] = None
    for pattern in court_patterns:
        court_match = re.search(pattern, page_content, re.IGNORECASE)
        if court_match:
            court_text = court_match.group(1).strip()
            # Additional validation - must contain "COURT" or "MAHKAMAH"
            if "COURT" in court_text.upper() or "MAHKAMAH" in court_text.upper():
                metadata["court"] = court_text
                break

    # Appeal Type, Regex: matches "Civil Appeal", "Appeal (Civil)", "Rayuan Sivil", any capitalization
    pattern = r"(Civil\s+Appeal|Appeal\s*\(Civil\)|Rayuan\s+Sivil)"

    if re.search(pattern, page_content, re.IGNORECASE):
        metadata["appeal_type"] = "Civil Appeal"
    else:
        metadata["appeal_type"] = "Unknown"

    # Decision Date
    date_match = re.search(r"(\d{2}/\d{2}/\d{4})", page_content)
    if date_match:
        # raw_date = date_match.group(1)
        # parsed_date = datetime.strptime(raw_date, "%d/%m/%Y")
        # metadata["decision_date"] = parsed_date.strftime("%Y-%m-%d")
        
        # For manual extraction purpose
        from dateutil import parser

        raw_date = "02/15/2166"
        parsed_date = parser.parse(raw_date, dayfirst=True)  # or dayfirst=False
        print(parsed_date.strftime("%Y-%m-%d"))
    else:
        metadata["decision_date"] = None
        
    # Serial Number
    # SIN is to handle wrongly ocr text
    serial_match = re.search(r"(?:SIN|S/N)\s+([a-zA-Z0-9]{15,25})", page_content)
    metadata["serial_number"] = serial_match.group(1) if serial_match else None
    
    # UNIVERSAL registration number pattern for all party types
    # Matches: [No. K/P:123], [No. Pendaftaran:456], [No. Syarikat:789], etc.
    registration_pattern = r"(?:\s+\[No\.\s+(?:K/P|Pendaftaran|Perniagaan|Syarikat):[^\]]+\])?"

    # Appellant patterns (for appeals)
    appellant_patterns = [
        r"BETWEEN\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+APPELLANT?\b",
        r"BETWEEN\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+APPELLANTS?\b",
        r"ANTARA\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+PERAYU(?:-PERAYU)?\b",
    ]

    # Respondent patterns (for appeals)
    respondent_patterns = [
        r"(?:^|\n)\s*(?:AND|DAN)\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+RESPONDENT?\b",
        r"(?:^|\n)\s*(?:AND|DAN)\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+RESPONDENTS?\b",
        r"(?:^|\n)\s*(?:AND|DAN)\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+RESPONDEN(?:-RESPONDEN)?\b",
    ]

    # Plaintiff patterns (for original suits)
    plaintiff_patterns = [
        r"BETWEEN\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+PLAINTIFF?\b",
        r"BETWEEN\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+PLAINTIFFS?\b",
        r"ANTARA\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+PLAINTIF(?:-PLAINTIF)?\b",
        # Simple pattern without dots: ANTARA [Name] [Registration] PLAINTIF
        r"ANTARA\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+PLAINTIF\b",
    ]

    # Defendant patterns (for original suits)
    defendant_patterns = [
        r"(?:^|\n)\s*(?:AND|DAN)\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+DEFENDANT?\b",
        r"(?:^|\n)\s*(?:AND|DAN)\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+DEFENDANTS?\b",
        r"(?:^|\n)\s*(?:AND|DAN)\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+[…\.]{3,}\s+DEFENDAN(?:-DEFENDAN)?\b",
        # Simple pattern without dots: DAN [Name] [Registration] DEFENDAN
        r"(?:^|\n)\s*DAN\s+([A-Z][A-Z\s]{10,50}?)" + registration_pattern + r"\s+DEFENDAN\b",
    ]

    # Extract parties with controlled matching
    first_parties = []
    second_parties = []
    
    # Try appellant patterns first (for appeals)
    for pattern in appellant_patterns:
        matches = re.findall(pattern, page_content, re.IGNORECASE | re.MULTILINE)
        if matches:
            first_parties.extend(matches)
            break
    
    # If no appellants, try plaintiff patterns (for original suits)
    if not first_parties:
        for pattern in plaintiff_patterns:
            matches = re.findall(pattern, page_content, re.IGNORECASE | re.MULTILINE)
            if matches:
                first_parties.extend(matches)
                break

    # Try respondent patterns first (for appeals)
    for pattern in respondent_patterns:
        matches = re.findall(pattern, page_content, re.IGNORECASE | re.MULTILINE)
        if matches:
            second_parties.extend(matches)
            break
    
    # If no respondents, try defendant patterns (for original suits)
    if not second_parties:
        for pattern in defendant_patterns:
            matches = re.findall(pattern, page_content, re.IGNORECASE | re.MULTILINE)
            if matches:
                second_parties.extend(matches)
                break

    # Enhanced name cleaning and validation
    def clean_and_validate_party_name(name):
        # Remove ALL types of registration numbers
        cleaned = re.sub(r'\[No\.\s+(?:K/P|Pendaftaran|Syarikat):[^\]]+\]', '', name)
        # Normalize whitespace
        cleaned = re.sub(r'\s+', ' ', cleaned.strip())
        
        # Validate: must be proper name (letters and spaces only, reasonable length)
        if (len(cleaned) > 5 and len(cleaned) < 80 and 
            re.match(r'^[A-Z][A-Z\s&.,()-]+$', cleaned) and
            not any(word in cleaned.lower() for word in ['seksyen', 'akta', 'plaintif', 'defendan', 'dan', 'yang', 'untuk', 'dalam', 'dengan', 'pada', 'atau'])):
            return cleaned
        return None

    # Store validated parties
    if first_parties:
        cleaned_first = [clean_and_validate_party_name(name) for name in first_parties]
        metadata["petitioners"] = [name for name in cleaned_first if name is not None]
    else:
        metadata["petitioners"] = []
    
    if second_parties:
        cleaned_second = [clean_and_validate_party_name(name) for name in second_parties]
        metadata["respondents"] = [name for name in cleaned_second if name is not None]
    else:
        metadata["respondents"] = []

    # Judges (Coram)
    coram_match = re.search(r"CORAM:\s*(.*?)\n\n", page_content, re.IGNORECASE | re.DOTALL)
    if coram_match:
        judges_raw = coram_match.group(1).split('\n')
        metadata["coram"] = [judge.strip() for judge in judges_raw if judge.strip()]
    else:
        metadata["coram"] = []
        
    def extract_references_by_keywords(page_content):
        """Extract references based on specific keywords and sections"""
        
        # Initialize lists
        cases_referred = []
        legislation_referred = []
        other_sources_referred = []
        
        # Define keyword patterns for each category
        cases_keywords = [
            "Cases referred:",
            "Cases cited",
            "Cases referred to:",
        ]
        
        legislation_keywords = [
            "Legislation referred:",
            "Legislation referred to:",
            "Legislation cited",
            "Acts/Laws referred:",
            "Acts/Laws referred to:",
            "Acts/Laws cited",
            # to handle wrongly ocr text
            "ActslLaws referred:",
            "ActslLaws referred to:",
            "ActslLaws cited"
        ]
        
        other_sources_keywords = [
            "Other sources referred to:",
            "Other sources referred:",
            "Other sources cited",
            "Articles referred to:",
            "Articles referred:",
            "Articles cited",
        ]
        
        # Create combined patterns with case-insensitive matching
        all_keywords = cases_keywords + legislation_keywords + other_sources_keywords
        
        # Find all keyword positions
        keyword_positions = []
        for keyword in all_keywords:
            for match in re.finditer(re.escape(keyword), page_content, re.IGNORECASE):
                keyword_positions.append((match.start(), match.end(), keyword.lower()))
        
        # Sort by position
        keyword_positions.sort()
        
        # Extract content between keywords
        for i, (start, end, keyword) in enumerate(keyword_positions):
            # Find the end position (next keyword or end of content)
            if i + 1 < len(keyword_positions):
                content_end = keyword_positions[i + 1][0]
            else:
                content_end = len(page_content)
            
            # Extract the content
            content = page_content[end:content_end].strip()
            
            # Split into lines and clean up
            lines = [line.strip() for line in content.split('\n') if line.strip()]
            
            # 🚨 NEW: Filter out lines containing serial numbers
            filtered_lines = []
            for line in lines:
                # Check if line contains serial number pattern
                if not re.search(r"(?:SIN|S/N)\s+[a-zA-Z0-9]{15,25}", line, re.IGNORECASE):
                    filtered_lines.append(line)
            
            # Categorize based on keyword
            if any(k in keyword for k in ["cases referred", "cases cited"]):
                cases_referred.extend(filtered_lines)
            elif any(k in keyword for k in ["legislation", "acts/laws", "actsllaws"]):
                legislation_referred.extend(filtered_lines)
            elif any(k in keyword for k in ["other sources", "articles"]):
                other_sources_referred.extend(filtered_lines)
        
        return cases_referred, legislation_referred, other_sources_referred

    # Use the new extraction function
    cases_referred, legislation_referred, other_sources_referred = extract_references_by_keywords(page_content)
    
    # Clean up references - remove duplicates and empty entries
    def clean_references(ref_list):
        """Remove duplicates and clean up references"""
        cleaned = []
        seen = set()
        for ref in ref_list:
            ref_clean = ref.strip()
            # Remove bullet points and list markers
            ref_clean = re.sub(r'^[>➤▶•\-\d+\)\.]?\s*', '', ref_clean)
            
            if ref_clean and ref_clean not in seen and len(ref_clean) > 5:
                cleaned.append(ref_clean)
                seen.add(ref_clean)
        return cleaned

    # Set the metadata with proper field names
    metadata["related_cases"] = clean_references(cases_referred)
    metadata["legislation_referred"] = clean_references(legislation_referred)
    metadata["other_sources_referred"] = clean_references(other_sources_referred)

    return metadata

## Memory-Optimized OCR Processing

This section implements a thread-safe OCR manager designed for memory-constrained environments. The system uses a single GPU instance shared across all processing threads to prevent memory overflow while maintaining parallel processing capabilities.

### Key Features:
- **Single GPU Instance**: Prevents memory duplication across threads
- **Thread-Safe Processing**: Uses locks to serialize GPU access while allowing parallel PDF-to-image conversion
- **Memory Management**: Aggressive cleanup and garbage collection
- **Fallback Support**: Automatic CPU fallback if GPU initialization fails
- **Progress Tracking**: Detailed logging for monitoring processing status

This approach is essential when processing large batches of scanned legal documents that require OCR processing.

In [None]:
class SingleOCRManager:
    """
    Thread-safe OCR manager with single GPU instance for memory-constrained systems.
    
    This class ensures that only one OCR model instance is loaded into GPU memory,
    preventing memory overflow when processing multiple documents in parallel.
    The OCR processing is serialized while PDF-to-image conversion can run in parallel.
    
    Attributes:
        ocr_reader: Single EasyOCR instance shared across all threads
        lock: Threading lock for serializing GPU access
        initialized: Flag to track initialization status
        processing_queue: Queue for managing processing requests
    """
    
    def __init__(self):
        self.ocr_reader = None
        self.lock = threading.Lock()
        self.initialized = False
        self.processing_queue = Queue()
        
    def initialize_ocr_once(self):
        """
        Initialize OCR reader once and reuse for all threads.
        
        Uses double-checked locking pattern to ensure thread-safe initialization.
        Supports both English and Malay languages commonly found in Malaysian legal documents.
        """
        with self.lock:
            if not self.initialized:
                print("🔧 Initializing single OCR reader (GPU memory optimized)...")
                try:
                    # Initialize with English and Malay language support
                    self.ocr_reader = easyocr.Reader(['en', 'ms'], gpu=True, verbose=False)
                    self.initialized = True
                    print("✅ OCR reader initialized successfully")
                except Exception as e:
                    print(f"❌ OCR initialization failed: {e}")
                    # Fallback to CPU if GPU fails
                    self.ocr_reader = easyocr.Reader(['en', 'ms'], gpu=False, verbose=False)
                    self.initialized = True
                    print("⚠️ Fallback to CPU OCR")
    
    def perform_ocr_safe(self, pdf_path):
        """
        Thread-safe OCR processing with single model instance.
        
        This method converts PDF pages to images in parallel, then processes
        them through OCR sequentially to avoid GPU memory conflicts.
        
        Args:
            pdf_path (str): Path to PDF file for OCR processing
            
        Returns:
            list: List of OCR results for each page, or None if processing fails
        """
        if not self.initialized:
            self.initialize_ocr_once()
        
        pdf_filename = os.path.basename(pdf_path)
        
        # 📄 STEP 1: Convert PDF to images (can be parallelized - no GPU needed)
        print(f"📄 Converting PDF to images: {pdf_filename}")
        try:
            pages = convert_from_path(
                pdf_path, 
                dpi=200,                    # Balance between quality and processing speed
                fmt="jpeg",                 # Efficient format for OCR
                output_folder="./temp",     # Temporary storage for images
                paths_only=True            # Return paths instead of loading images
            )
        except Exception as e:
            print(f"❌ PDF conversion failed for {pdf_filename}: {e}")
            return None
        
        # Validate that pages were extracted
        if not pages:
            print(f"⚠️ No pages extracted from {pdf_filename}")
            return []
        
        results = []
        
        # 🔍 STEP 2: OCR processing (MUST be serialized due to single GPU model)
        with self.lock:
            print(f"🔍 OCR processing (GPU): {pdf_filename}")
            start_time = time.time()
            
            try:
                for i, page_path in enumerate(pages):
                    with Image.open(page_path) as page:
                        print(f"   📄 Processing page {i+1}/{len(pages)}")
                        page_np = np.array(page)
                        
                        # Validate page content
                        if page_np.size == 0:
                            print(f"   ⚠️ Empty page {i+1}, skipping...")
                            results.append([])
                            continue
                        
                        # Process with the single OCR reader instance
                        result = self.ocr_reader.readtext(
                            page_np, 
                            batch_size=8,      # Small batch to save memory
                            detail=0,          # Return only text, not coordinates
                            paragraph=True,    # Group text into paragraphs
                        )
                        results.append(result)
                        
                        # 🧹 Immediate memory cleanup
                        del page_np
                        gc.collect()
                        
                processing_time = time.time() - start_time
                print(f"⏱️ OCR completed in {processing_time:.2f}s ({len(pages)} pages)")
                
            except Exception as e:
                print(f"❌ OCR processing failed for {pdf_filename}: {e}")
                return None
            finally:
                # Always clean up memory
                try:
                    del pages
                except:
                    pass
        
        return results

# 🌍 Create global single OCR manager instance
# This ensures only one OCR model is loaded regardless of how many threads are processing
single_ocr_manager = SingleOCRManager()

In [12]:
results = single_ocr_manager.perform_ocr_safe("../../../../experiment/test2.pdf")

🔧 Initializing single OCR reader (GPU memory optimized)...
✅ OCR reader initialized successfully
📄 Converting PDF to images: test2.pdf
🔍 OCR processing (GPU): test2.pdf
   📄 Processing page 1/35
   📄 Processing page 2/35
   📄 Processing page 3/35
   📄 Processing page 4/35
   📄 Processing page 5/35
   📄 Processing page 6/35
   📄 Processing page 7/35
   📄 Processing page 8/35
   📄 Processing page 9/35
   📄 Processing page 10/35
   📄 Processing page 11/35
   📄 Processing page 12/35
   📄 Processing page 13/35
   📄 Processing page 14/35
   📄 Processing page 15/35
   📄 Processing page 16/35
   📄 Processing page 17/35
   📄 Processing page 18/35
   📄 Processing page 19/35
   📄 Processing page 20/35
   📄 Processing page 21/35
   📄 Processing page 22/35
   📄 Processing page 23/35
   📄 Processing page 24/35
   📄 Processing page 25/35
   📄 Processing page 26/35
   📄 Processing page 27/35
   📄 Processing page 28/35
   📄 Processing page 29/35
   📄 Processing page 30/35
   📄 Processing page 31/35
   

In [13]:
all_pages_text = "\n\n".join(["\n".join(page_text_list) for page_text_list in results])
metadata = extract_metadata_using_regex(all_pages_text)
metadata

{'case_number': 'WA-I2BNCVC-112-08/2023\nANTARA\n1  NG CHIN TUAN (No',
 'court': 'MAHKAMAH TINGGI MALAYA',
 'appeal_type': 'Civil Appeal',
 'decision_date': '2024-10-24',
 'serial_number': '7FguAEsSOin53tcRgIWTA',
 'petitioners': [],
 'respondents': [],
 'coram': [],
 'related_cases': ['Gan Yook Chin (P) & Anor v Lee Ing Chin CLJ 309 FC',
  'Lee Teck Seng & Ors (2004) 4',
  "Ong Leong Chiou & Anor v Keller (M) Sdn Bhd & Ors (2021) 3 MLJ 622 FC Tengku Dato' Ibrahim Petra bin Tengku Indra Petra v Petra Perdana Bhd (2018) 2 MLJ 177 FC Ng Hoo Kui & Anor v Wendy Tan Lee Peng (2020) 12 MLJ 67 FC Sarmiina Sdn Bhd v Gerry Ho & Ors (2023) 5 MLRA 159 FC Sejati Education Sdn Bhd V S3M Development (Sabah) Sdn Bhd (2016) 6 CLJ 710 CA",
  'FKJV (M) Sdn Bhd v Mode Circle Bhd (2012) MLJU 751 HC',
  'Tieh Boon Tuck v Evonne Lee Pei Chen & Anor (2014) 1 MLJ 882 CA',
  'Cipta Cermat Sdn Bhd v Perbandaran Kemajuan Negeri Kedah (2007) 2 MLJ 746 CA',
  'Lai Hee Sang v Pun Hai Chin (2016) 6 MLJ 434 CA',
  'B

In [None]:
# 🦙 PyMuPDF4LLM LlamaMarkdownReader for RAG pipeline integration
def extract_text_and_metadata_llamaindex(pdf_path):
    """
    Extract text using LlamaMarkdownReader optimized for LlamaIndex RAG pipeline.
    
    This function creates LlamaIndex Document objects that are ready for vector embedding
    and retrieval. The markdown format preserves document structure while making the
    content suitable for semantic chunking.
    
    Args:
        pdf_path (str): Path to the PDF file to process
        
    Returns:
        tuple: (documents, metadata) where documents is a list of LlamaIndex Document objects
               and metadata is a dictionary of extracted legal metadata
    """
    # Initialize the LlamaIndex-compatible reader
    llama_reader = pymupdf4llm.LlamaMarkdownReader()
    
    # Load document - returns list of LlamaIndex Document objects with markdown formatting
    documents = llama_reader.load_data(pdf_path)
    
    # Combine all document text for comprehensive metadata extraction
    full_text = ""
    for doc in documents:
        full_text += doc.text + "\n\n"
    
    # Extract legal metadata using regex patterns
    metadata = extract_metadata_using_regex(full_text)
    
    # Add extracted metadata to each document for RAG pipeline
    for doc in documents:
        # Clear any existing metadata
        doc.metadata = {}
        
        # Add comprehensive metadata including file information
        doc.metadata.update(metadata)
        doc.metadata["source_file"] = os.path.basename(pdf_path)
        doc.metadata["extraction_method"] = "PyMuPDF4LLM_LlamaMarkdownReader"
        doc.metadata["document_type"] = "legal_case"
    
    return documents, metadata

## LlamaIndex Integration for RAG Pipeline

This section integrates PyMuPDF4LLM with LlamaIndex to create documents optimized for RAG (Retrieval-Augmented Generation) systems. The integration ensures that extracted text maintains proper formatting and metadata for downstream AI applications.

In [None]:
# Test the LlamaIndex method with your existing test file
documents, metadata = extract_text_and_metadata_llamaindex("../../../../experiment/test1.pdf")

print("=== DOCUMENT INFO ===")
print(f"Number of documents: {len(documents)}")
print(f"Document type: {type(documents[0])}")

print("\n=== EXTRACTED METADATA ===")
for key, value in metadata.items():
    print(f"{key}: {value}")

print("\n=== FIRST DOCUMENT SAMPLE ===")
print(f"Text length: {len(documents[0].text)} characters")
print(f"Metadata keys: {list(documents[0].metadata.keys())}")
print("\nFirst 500 characters:")
print(documents[0].text[:500] + "...")

Successfully imported LlamaIndex
=== DOCUMENT INFO ===
Number of documents: 95
Document type: <class 'llama_index.core.schema.Document'>

=== EXTRACTED METADATA ===
case_number: 22-121-2008


Between


Teoh Kiang Hong
court: FEDERAL COURT OF MALAYSIA AT PUTRAJAYA**
appeal_type: Civil Appeal
decision_date: 2025-01-16
serial_number: 9FZtw7kygkCReUiJH3PmuQ
petitioners: []
respondents: []
coram: ['**']
related_cases: ['by the Court of Appeal. Gary argued on this basis, that', 'post trial recantation is an appellate question whereas in-trial recantation is', 'an assessment question for the trial judge. Hence, it was argued that by', 'relying on the post-trial recantation cases, the Court of Appeal had made a', 'fundamental error, which, on that material flaw alone, the decision of the', 'Court of Appeal on recantation cannot stand, and ought to be set aside.', '(58) In this regard, we agree that recantation or retraction of evidence', 'occurring during the trial is a matter for assessment o

In [14]:
documents[0].metadata

{'case_number': '22-121-2008\n\n\nBetween\n\n\nTeoh Kiang Hong',
 'court': 'FEDERAL COURT OF MALAYSIA AT PUTRAJAYA**',
 'appeal_type': 'Civil Appeal',
 'decision_date': '2025-01-16',
 'serial_number': '9FZtw7kygkCReUiJH3PmuQ',
 'petitioners': [],
 'respondents': [],
 'coram': ['**'],
 'related_cases': ['by the Court of Appeal. Gary argued on this basis, that',
  'post trial recantation is an appellate question whereas in-trial recantation is',
  'an assessment question for the trial judge. Hence, it was argued that by',
  'relying on the post-trial recantation cases, the Court of Appeal had made a',
  'fundamental error, which, on that material flaw alone, the decision of the',
  'Court of Appeal on recantation cannot stand, and ought to be set aside.',
  '(58) In this regard, we agree that recantation or retraction of evidence',
  'occurring during the trial is a matter for assessment of evidence of the',
  '**Note : Serial number will be used to verify the originality of this documen

In [None]:
# 💾 Document serialization utilities
import json
import os
from llama_index.core import Document

# Define output directory for processed documents
save_dir = "../../../../data/processed/legal_cases/processed_rag_legal_case_files"

def save_document_json(doc: Document, save_path: str):
    """
    Save a LlamaIndex Document object to JSON format for persistent storage.
    
    This function serializes Document objects so they can be loaded later by the
    ingestion pipeline without requiring reprocessing. The JSON format preserves
    both text content and metadata.
    
    Args:
        doc (Document): LlamaIndex Document object to serialize
        save_path (str): Target file path for JSON storage
    """
    # Create structured data dictionary
    data = {
        "text": doc.text,           # Document content (markdown formatted)
        "metadata": doc.metadata  # Extracted legal metadata
    }
    
    # Ensure directory exists
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    
    # Save with proper encoding for international characters
    with open(save_path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)

## Document Serialization and Storage

This section handles the conversion of LlamaIndex Document objects to JSON format for persistent storage. This enables the preprocessing pipeline to save processed documents that can be later loaded by the ingestion pipeline without reprocessing.

In [6]:
def attempt_ocr_extraction(pdf_path):
    """
    Attempt OCR extraction with proper error handling
    """
    try:
        ocr_results = single_ocr_manager.perform_ocr_safe(pdf_path)
        
        if ocr_results is None:
            print("❌ OCR processing returned None")
            raise Exception("OCR processing failed")
        
        # Convert OCR results to text
        ocr_text = "\n\n".join(["\n".join(page_text_list) for page_text_list in ocr_results if page_text_list])
        
        # Check if OCR produced meaningful content
        if len(ocr_text.strip()) < 50:
            print(f"⚠️ OCR produced minimal text ({len(ocr_text)} chars)")
            
        # Extract metadata from OCR text
        ocr_metadata = extract_metadata_using_regex(ocr_text)
        merged_metadata = {**ocr_metadata, "extraction_method": "OCR"}
        
        print("✅ OCR extraction successful")
        return ocr_text, merged_metadata
        
    except Exception as ocr_error:
        print(f"❌ OCR extraction also failed: {ocr_error}")
        
        # Return empty document with comprehensive error info
        error_document = Document(
            text="",
            metadata={
                "extraction_method": "Failed (both PDF and OCR failed)",
                "ocr_error": str(ocr_error)
            }
        )
        return error_document, {"extraction_method": "Failed"}

In [7]:
def hybrid_extraction_approach(pdf_path, try_ocr=True):
    """
    Hybrid approach: Try PyMuPDF4LLM first, fall back to OCR for scanned documents
    """
    try:
        # Try direct PDF text extraction
        documents, metadata = extract_text_and_metadata_llamaindex(pdf_path)
        
        # Check if we got meaningful text content
        total_text_length = sum(len(doc.text.strip()) for doc in documents)
        
        # If we got very little text, the PDF might be scanned/image-based
        if total_text_length < 100:
            print(f"⚠️ Low text content ({total_text_length} chars), trying OCR...")
            if try_ocr:
                return attempt_ocr_extraction(pdf_path)
            else:
                print("⚠️ OCR disabled, returning minimal content")
        
        # Combine all doc texts into one
        combined_text = "\n\n".join(
            doc.text.strip() for doc in documents if doc.text.strip()
        )

        # Merge metadata (or just reuse original metadata dict)
        combined_metadata = {
            **metadata,
            "extraction_method": "PyMuPDF4LLM with LlamaMarkdownReader"
        }

        print("✅ Direct PDF extraction successful")
        return combined_text, combined_metadata

    except Exception as e:
        print(f"❌ Direct PDF extraction failed: {e}")
        
        if try_ocr:
            print("🔄 Falling back to OCR...")
            return attempt_ocr_extraction(pdf_path)
        else:
            # Return empty document with error info instead of raising
            print("❌ OCR disabled and PDF extraction failed")
            error_document = Document(
                text="",
                metadata={
                    "extraction_method": "Failed (PDF extraction error, OCR disabled)",
                    "error": str(e)
                }
            )
            return error_document, {"extraction_method": "Failed"}

In [8]:
def update_json_with_pdf_metadata(file_path, pdf_extracted_metadata, save_file=True):
    """
    Update a JSON file with metadata extracted from PDF
    
    Args:
        file_path (str): Path to the JSON file to update
        pdf_extracted_metadata (dict): Metadata extracted from PDF using extract_metadata_using_regex()
        save_file (bool): Whether to save the file or just return the updated data
    
    Returns:
        dict: Updated JSON data with merged metadata
    """
    
    # Key mapping: old → new
    key_mapping = {
        "nombor_kes": "case_number",
        "tarikh_keputusan": "decision_date",
        "pihak_pihak": "petitioners_and_respondents",
        "hakim_majistret": "coram"
    }

    def rename_keys(obj):
        """Recursively rename keys in dicts/lists."""
        if isinstance(obj, dict):
            new_obj = {}
            for k, v in obj.items():
                new_key = key_mapping.get(k, k)
                new_obj[new_key] = rename_keys(v)
            return new_obj
        elif isinstance(obj, list):
            return [rename_keys(i) for i in obj]
        else:
            return obj

    # Fields we need to check for emptiness
    check_fields = {
        "petitioners_and_respondents": ["petitioners", "respondents"],
        "decision_date": "decision_date",
        "coram": "coram"
    }

    exclude_keys = {"case_number", "decision_date", "petitioners", "respondents"}

    try:
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)
    except json.JSONDecodeError:
        print(f"❌ Invalid JSON: {file_path}")
        return None
    except FileNotFoundError:
        print(f"❌ File not found: {file_path}")
        return None

    # Step 1: Rename keys
    data = rename_keys(data)

    # Step 2: Extract "court" from case_number
    case_number = data.get("case_number", "").strip()
    court_match = re.search(r"\(([^)]+)\)\s*$", case_number)

    court_name = None
    if court_match:
        court_name = court_match.group(1)
        case_number = case_number[:court_match.start()].strip()
        data["case_number"] = case_number

    # Step 3: Ensure "court" right after "case_number"
    ordered_data = OrderedDict()
    for k, v in data.items():
        ordered_data[k] = v
        if k == "case_number" and court_name:
            ordered_data["court"] = court_name

    # Step 4: Fill empty fields from PDF metadata
    # To merge petitioners and respondents into one string
    if not ordered_data.get("petitioners_and_respondents"):
        petitioners = pdf_extracted_metadata.get("petitioners")
        respondents = pdf_extracted_metadata.get("respondents")

        if petitioners or respondents:
            parts = []
            if petitioners:
                parts.append(f"petitioners: {petitioners}")
            if respondents:
                parts.append(f"respondents: {respondents}")
            ordered_data["petitioners_and_respondents"] = " and ".join(parts)
        else:
            # Explicitly set as empty string for consistency
            ordered_data["petitioners_and_respondents"] = ""
        
        # Remove separate petitioners/respondents keys to avoid duplication
        if "petitioners" in ordered_data:
            del ordered_data["petitioners"]
        if "respondents" in ordered_data:
            del ordered_data["respondents"]

    # Step 5: Add decision_date and coram if missing
    if not ordered_data.get("decision_date") and pdf_extracted_metadata.get("decision_date"):
        ordered_data["decision_date"] = pdf_extracted_metadata["decision_date"]

    if not ordered_data.get("coram") and pdf_extracted_metadata.get("coram"):
        ordered_data["coram"] = pdf_extracted_metadata["coram"]

    # Step 6: Append extra metadata
    for k, v in pdf_extracted_metadata.items():
        if k not in exclude_keys and k not in ordered_data:
            ordered_data[k] = v

    # Save updated JSON if requested
    if save_file:
        try:
            with open(file_path, "w", encoding="utf-8") as f:
                json.dump(ordered_data, f, ensure_ascii=False, indent=4)
            print(f"✅ Updated {file_path} | Court: {court_name}")
        except Exception as e:
            print(f"❌ Error saving file: {e}")
            return None

    return ordered_data

In [11]:
def batch_update_json_files_with_pdf_metadata(pdf_extracted_metadata, 
                                           json_dirs=None, 
                                           specific_file=None):
    """
    Batch update multiple JSON files or a specific file with PDF metadata
    
    Args:
        pdf_extracted_metadata (dict): Metadata extracted from PDF
        json_dirs (list): List of directories containing JSON files to update
        specific_file (str): Path to a specific JSON file to update
    
    Returns:
        dict: Summary of updates performed
    """
    
    update_summary = {
        "success_count": 0,
        "error_count": 0,
        "files_processed": [],
        "errors": []
    }
    
    if specific_file:
        # Update single file
        try:
            result = update_json_with_pdf_metadata(specific_file, pdf_extracted_metadata)
            if result:
                update_summary["success_count"] += 1
                update_summary["files_processed"].append(specific_file)
            else:
                update_summary["error_count"] += 1
                update_summary["errors"].append(f"Failed to update {specific_file}")
        except Exception as e:
            update_summary["error_count"] += 1
            update_summary["errors"].append(f"Error with {specific_file}: {e}")
    
    elif json_dirs:
        # Update multiple directories
        for json_dir in json_dirs:
            if not os.path.exists(json_dir):
                print(f"⚠️ Skipping {json_dir}, does not exist.")
                update_summary["errors"].append(f"Directory not found: {json_dir}")
                continue

            for filename in os.listdir(json_dir):
                if filename.endswith(".json"):
                    file_path = os.path.join(json_dir, filename)
                    
                    try:
                        result = update_json_with_pdf_metadata(file_path, pdf_extracted_metadata)
                        if result:
                            update_summary["success_count"] += 1
                            update_summary["files_processed"].append(file_path)
                        else:
                            update_summary["error_count"] += 1
                            update_summary["errors"].append(f"Failed to update {file_path}")
                    except Exception as e:
                        update_summary["error_count"] += 1
                        update_summary["errors"].append(f"Error with {file_path}: {e}")
    
    # Print summary
    print(f"\n=== UPDATE SUMMARY ===")
    print(f"✅ Successfully updated: {update_summary['success_count']} files")
    print(f"❌ Errors: {update_summary['error_count']} files")
    
    if update_summary["errors"]:
        print(f"\n⚠️ Errors encountered:")
        for error in update_summary["errors"][:5]:  # Show first 5 errors
            print(f"  - {error}")
        if len(update_summary["errors"]) > 5:
            print(f"  ... and {len(update_summary['errors']) - 5} more errors")
    
    return update_summary

In [19]:
# Quick test - just check what files exist
print("\n=== DEBUG: Check available files ===")

base_dir = "../../../../data/raw/legal_cases"
print(f"Base directory: {base_dir}")

if os.path.exists(base_dir):
    folders = [f for f in os.listdir(base_dir) if f.startswith('legal_case_folder')]
    print(f"Found folders: {folders}")
    
    for folder in folders[:3]:  # Check first 3 folders
        folder_path = os.path.join(base_dir, folder)
        pdf_files = [f for f in os.listdir(folder_path) if f.endswith('.pdf')]
        print(f"  {folder}: {len(pdf_files)} PDFs")
        if pdf_files:
            print(f"    First PDF: {pdf_files[0]}")
else:
    print(f"❌ Base directory not found: {base_dir}")


=== DEBUG: Check available files ===
Base directory: ../../../../data/raw/legal_cases
Found folders: ['legal_case_folder1', 'legal_case_folder10', 'legal_case_folder11', 'legal_case_folder12', 'legal_case_folder13', 'legal_case_folder14', 'legal_case_folder15', 'legal_case_folder16', 'legal_case_folder2', 'legal_case_folder3', 'legal_case_folder4', 'legal_case_folder5', 'legal_case_folder6', 'legal_case_folder7', 'legal_case_folder8', 'legal_case_folder9']
  legal_case_folder1: 1000 PDFs
    First PDF: (M1)_22-203-2006,_(M3)_22-45-2008_(Mahkamah_Tinggi).pdf
  legal_case_folder10: 1000 PDFs
    First PDF: PA-22NCvC-22-02_2023_(Mahkamah_Tinggi).pdf
  legal_case_folder11: 1000 PDFs
    First PDF: Saman_Pemula_No._24-43-11_2014_(Mahkamah_Tinggi).pdf


In [9]:
def extract_base_filename(filename):
    """
    Extract base filename for matching PDF and JSON files
    
    Args:
        filename (str): Full filename with extension
    
    Returns:
        str: Base filename without extension
    
    Examples:
        "(M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi).pdf" 
        -> "(M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi)"
        
        "(M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi)_metadata.json"
        -> "(M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi)"
    """
    # Remove file extension
    base_name = os.path.splitext(filename)[0]
    
    # Remove "_metadata" suffix if present (for JSON files)
    if base_name.endswith("_metadata"):
        base_name = base_name[:-9]  # Remove "_metadata"
    
    return base_name

In [10]:
def merge_metadata(extracted_metadata, curated_metadata):
    """
    Merge extracted and curated metadata.
    - Use extracted keys as the baseline.
    - If curated has the same key and the value is non-empty (not None, not "", not []), use curated.
    """
    merged = extracted_metadata.copy()

    for key, value in curated_metadata.items():
        if value not in (None, "", [], {}):  # check non-empty
            merged[key] = value

    return merged

In [11]:
def process_pdf_with_matching_json(pdf_path, json_dir):
    """
    Process a single PDF and update its corresponding JSON file based on filename matching
    
    Args:
        pdf_path (str): Full path to PDF file
        json_dir (str): Directory containing JSON metadata files
    
    Returns:
        dict: Processing result with success/error information
    """
    result = {
        "pdf_file": os.path.basename(pdf_path),
        "success": False,
        "json_updated": False,
        "error": None,
        "metadata_extracted": {},
        "extraction_method": None
    }
    
    try:
        # Extract base filename from PDF
        pdf_filename = os.path.basename(pdf_path)
        base_name = extract_base_filename(pdf_filename)
        
        print(f"   🔍 Processing PDF: {pdf_filename}")
        print(f"   📝 Base name: {base_name}")
        
        # Extract metadata from PDF
        extracted_text, extracted_metadata = hybrid_extraction_approach(pdf_path)
        
        # Add source information to metadata
        extracted_metadata["source_filename"] = pdf_filename
        
        result["metadata_extracted"] = extracted_metadata
        result["success"] = True
        result["extraction_method"] = extracted_metadata.get("extraction_method", "Unknown")
        
        print(f"   ✅ Metadata extracted:")
        print(f"      📋 Case: {extracted_metadata.get('case_number', 'N/A')}")
        print(f"      🏛️ Court: {extracted_metadata.get('court', 'N/A')}")
        print(f"      👥 Parties: {len(extracted_metadata.get('petitioners', []))} vs {len(extracted_metadata.get('respondents', []))}")
        
        # Find matching JSON file
        curated_metadata = {}
        if os.path.exists(json_dir):
            matching_json_file = None
            
            # Look for JSON file with matching base name
            for json_filename in os.listdir(json_dir):
                if json_filename.endswith(".json") and extract_base_filename(json_filename) == base_name:
                        matching_json_file = json_filename
                        break
            
            if matching_json_file:
                json_file_path = os.path.join(json_dir, matching_json_file)
                print(f"   🎯 Found matching JSON: {matching_json_file}")
                
                # Update the specific JSON file
                update_result = update_json_with_pdf_metadata(
                    json_file_path, 
                    extracted_metadata, 
                    save_file=True
                )
                
                # Reload curated metadata from the updated file
                with open(json_file_path, "r", encoding="utf-8") as f:
                    curated_metadata = json.load(f)
                
                if update_result:
                    result["json_updated"] = True
                    print(f"   ✅ Updated JSON file successfully")
                else:
                    result["error"] = f"Failed to update JSON file: {matching_json_file}"
                    print(f"   ❌ Failed to update JSON file")
            else:
                result["error"] = f"No matching JSON file found for base name: {base_name}"
                print(f"   ⚠️ No matching JSON file found")
        else:
            result["error"] = f"JSON directory not found: {json_dir}"
            print(f"   ❌ JSON directory not found: {json_dir}")
        
        # Step 3: Merge extracted + curated metadata (curated wins)
        final_metadata = merge_metadata(extracted_metadata, curated_metadata)

        # Step 4: Create final Document
        final_doc = Document(text=extracted_text, metadata=final_metadata)

        # Step 5: Save Document JSON in save_dir
        os.makedirs(save_dir, exist_ok=True)
        save_path = os.path.join(save_dir, pdf_filename + ".json")
        save_document_json(final_doc, save_path)
        result["save_path"] = save_path
        print(f"💾 Saved Document JSON: {save_path}")
            
    except Exception as e:
        result["error"] = f"Error processing {pdf_filename}: {str(e)}"
        print(f"   ❌ {result['error']}")
    
    return result

In [14]:
# Test processing a single directory on a single PDF file
# Use the name-based matching function instead
def process_single_directory_safely(pdf_dir, json_dir):
    """
    Process PDFs and update ONLY their matching JSON files
    """
    if not os.path.exists(pdf_dir):
        print(f"❌ PDF directory not found: {pdf_dir}")
        return
    
    pdf_files = [f for f in os.listdir(pdf_dir) if f.endswith('.pdf')]
    
    if not pdf_files:
        print(f"⚠️ No PDF files found")
        return
    
    print(f"📄 Found {len(pdf_files)} PDF files")
    
    # Process each PDF individually with its matching JSON
    for pdf_file in pdf_files:
        pdf_path = os.path.join(pdf_dir, pdf_file)
        
        # This function finds and updates ONLY the matching JSON file
        result = process_pdf_with_matching_json(pdf_path, json_dir)
        
        if result["success"] and result["json_updated"]:
            print(f"✅ {pdf_file} → Updated matching JSON")
            break
        elif result["success"] and not result["json_updated"]:
            print(f"⚠️ {pdf_file} → No matching JSON found")
            break
        else:
            print(f"❌ {pdf_file} → Error: {result['error']}")
            break
            
# SAFE TEST - Process single directory correctly
print("\n=== Single Directory Processing ===")

test_pdf_dir = "../../../../data/raw/legal_cases/legal_case_folder1/"
test_json_dir = "../../../../data/raw/legal_cases/legal_case_folder1/metadata_folder"

process_single_directory_safely(test_pdf_dir, test_json_dir)


=== Single Directory Processing ===
📄 Found 1000 PDF files
   🔍 Processing PDF: (M1)_22-203-2006,_(M3)_22-45-2008_(Mahkamah_Tinggi).pdf
   📝 Base name: (M1)_22-203-2006,_(M3)_22-45-2008_(Mahkamah_Tinggi)
Successfully imported LlamaIndex


KeyboardInterrupt: 

In [12]:
def parallel_processing_metadata_extraction(pdf_dir, json_dir, max_workers=2):
    """Parallel processing with single GPU OCR"""
    
    if not os.path.exists(pdf_dir):
        print(f"❌ PDF directory not found: {pdf_dir}")
        return []
    
    pdf_files = [f for f in os.listdir(pdf_dir) if f.endswith('.pdf')]
    if not pdf_files:
        print(f"⚠️ No PDF files found")
        return []
    
    print(f"📄 Found {len(pdf_files)} PDF files")
    print(f"🚀 Processing with {max_workers} workers (Single GPU OCR)...")
    
    # Initialize single OCR manager
    try:
        single_ocr_manager.initialize_ocr_once()
    except Exception as e:
        print(f"❌ Failed to initialize OCR manager: {e}")
        return []
    
    def process_single_pdf(pdf_file):
        """Process single PDF with memory optimization"""
        try:
            pdf_path = os.path.join(pdf_dir, pdf_file)
            result = process_pdf_with_matching_json(pdf_path, json_dir)
            # Force garbage collection after each file
            import gc
            gc.collect()
            return result
        except Exception as e:
            return {
                "pdf_file": pdf_file,
                "success": False,
                "json_updated": False,
                "error": f"Unexpected error: {str(e)}",
                "metadata_extracted": {},
                "extraction_method": None
            }
    
    # Process files in parallel (limited workers to prevent memory issues)
    results = []
    start_time = time.time()
    
    # Use fewer workers to prevent GPU memory conflicts
    try:
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Submit all tasks
            future_to_pdf = {
                executor.submit(process_single_pdf, pdf_file): pdf_file 
                for pdf_file in pdf_files
            }
            
            # Collect results as they complete
            completed = 0
            for future in as_completed(future_to_pdf):
                try:
                    result = future.result(timeout=300)  # 5 minute timeout per PDF
                    results.append(result)
                    completed += 1
                    
                    # Progress indicator
                    status = "✅" if result["success"] and result["json_updated"] else "⚠️" if result["success"] else "❌"
                    method = result.get("extraction_method", "N/A")
                    print(f"[{completed}/{len(pdf_files)}] {status} {result['pdf_file']} ({method})")
                except Exception as e:
                    pdf_file = future_to_pdf[future]
                    error_result = {
                        "pdf_file": pdf_file,
                        "success": False,
                        "json_updated": False,
                        "error": f"Future execution failed: {str(e)}",
                        "metadata_extracted": {},
                        "extraction_method": None
                    }
                    results.append(error_result)
                    completed += 1
                    print(f"[{completed}/{len(pdf_files)}] ❌ {pdf_file} (Error)")
    
    except Exception as e:
        print(f"❌ ThreadPoolExecutor failed: {e}")
        return results
    
    # Summary
    total_time = time.time() - start_time
    success_count = sum(1 for r in results if r["success"] and r["json_updated"])
    ocr_count = sum(1 for r in results if r.get("extraction_method") == "OCR")
    
    print(f"\n📊 Processing Complete:")
    print(f"   ⏱️ Total time: {total_time:.2f}s")
    print(f"   ✅ Successful: {success_count}/{len(results)}")
    print(f"   🔍 OCR used: {ocr_count} files")
    print(f"   📄 Direct extraction: {len(results) - ocr_count} files")
    print(f"   💾 Memory: Single GPU OCR instance")
    
    return results

In [13]:
# Quick dataset creation: Top 100 PDFs from each folder + random 20 for testing
pdf_directories = [f"../../../../data/raw/legal_cases/legal_case_folder{i}/" for i in range(1, 17)]
json_directories = [f"../../../../data/raw/legal_cases/legal_case_folder{i}/metadata_folder" for i in range(1, 17)]

rag_legal_cases_dir = "../../../../data/raw/legal_cases/rag_legal_case_files"
test_files_dir = "../../../../data/raw/legal_cases/test_files"
rag_legal_cases_metadata_dir = os.path.join(rag_legal_cases_dir, "metadata")
test_files_metadata_dir = os.path.join(test_files_dir, "metadata")

In [3]:
import os
import shutil
import random
from pathlib import Path

# Create directories
for dir_path in [rag_legal_cases_dir, test_files_dir, rag_legal_cases_metadata_dir, test_files_metadata_dir]:
    os.makedirs(dir_path, exist_ok=True)

def get_unique_pdf_name(pdf_path):
    """Remove '_1', '_2', etc. suffix to get unique name"""
    name = Path(pdf_path).stem
    # Remove trailing _1, _2, etc.
    if name.endswith(('_1', '_2', '_3', '_4', '_5', '_6', '_7', '_8', '_9')):
        name = name.rsplit('_', 1)[0]
    return name

def extract_top_pdfs_from_folder(pdf_dir, json_dir, target_pdf_dir, target_json_dir, max_files=100):
    """Extract top 100 largest PDFs with unique names from a folder"""
    if not os.path.exists(pdf_dir):
        print(f"Directory not found: {pdf_dir}")
        return []
    
    # Get all PDF files with their sizes
    pdf_files = []
    for file in os.listdir(pdf_dir):
        if file.endswith('.pdf'):
            file_path = os.path.join(pdf_dir, file)
            size = os.path.getsize(file_path)
            unique_name = get_unique_pdf_name(file_path)
            pdf_files.append((file, file_path, size, unique_name))
    
    # Group by unique name and keep only the largest file for each unique name
    unique_pdfs = {}
    for file, file_path, size, unique_name in pdf_files:
        if unique_name not in unique_pdfs or size > unique_pdfs[unique_name][2]:
            unique_pdfs[unique_name] = (file, file_path, size, unique_name)
    
    # Sort by size (largest first) and take top 100
    sorted_pdfs = sorted(unique_pdfs.values(), key=lambda x: x[2], reverse=True)[:max_files]
    
    copied_files = []
    for file, file_path, size, unique_name in sorted_pdfs:
        try:
            # Copy PDF file
            target_pdf_path = os.path.join(target_pdf_dir, file)
            shutil.copy2(file_path, target_pdf_path)
            
            # Copy corresponding JSON metadata file
            json_file = file.replace('.pdf', '_metadata.json')
            json_source = os.path.join(json_dir, json_file)
            json_target = os.path.join(target_json_dir, json_file)
            
            if os.path.exists(json_source):
                shutil.copy2(json_source, json_target)
                copied_files.append((file, json_file))
                print(f"Copied: {file} ({size/1024/1024:.1f}MB) + {json_file}")
            else:
                print(f"Warning: JSON not found for {file}")
                
        except Exception as e:
            print(f"Error copying {file}: {e}")
    
    return copied_files

# Extract top 100 PDFs from each of the 16 folders
print("Extracting top 100 PDFs from each folder...")
all_copied_files = []

for i, (pdf_dir, json_dir) in enumerate(zip(pdf_directories, json_directories), 1):
    print(f"\nProcessing folder {i}/16: {pdf_dir}")
    copied_files = extract_top_pdfs_from_folder(
        pdf_dir, json_dir, rag_legal_cases_dir, rag_legal_cases_metadata_dir, max_files=100
    )
    all_copied_files.extend(copied_files)
    print(f"Folder {i}: {len(copied_files)} files copied")

print(f"\nTotal files in RAG dataset: {len(all_copied_files)} PDF files + JSON files")

# Randomly select 20 files for testing
if len(all_copied_files) >= 20:
    print("\nSelecting 20 random files for testing...")
    test_files = random.sample(all_copied_files, 20)
    
    for pdf_file, json_file in test_files:
        try:
            # Move PDF file
            src_pdf = os.path.join(rag_legal_cases_dir, pdf_file)
            dst_pdf = os.path.join(test_files_dir, pdf_file)
            shutil.move(src_pdf, dst_pdf)
            
            # Move JSON file
            src_json = os.path.join(rag_legal_cases_metadata_dir, json_file)
            dst_json = os.path.join(test_files_metadata_dir, json_file)
            shutil.move(src_json, dst_json)
            
            print(f"Moved to test: {pdf_file} + {json_file}")
            
        except Exception as e:
            print(f"Error moving test file {pdf_file}: {e}")
    
    print(f"\nTest dataset: {len(test_files)} files moved to {test_files_dir}")
    print(f"RAG dataset: {len(all_copied_files) - len(test_files)} files remaining in {rag_legal_cases_dir}")
else:
    print(f"Not enough files for testing. Only {len(all_copied_files)} files available.")

print(f"\nCompleted! RAG dataset ready in: {rag_legal_cases_dir}")
print(f"Test dataset ready in: {test_files_dir}")

Extracting top 100 PDFs from each folder...

Processing folder 1/16: ../../../../data/raw/legal_cases/legal_case_folder1/
Copied: 22IP-17-05_2015_(Mahkamah_Tinggi).pdf (35.6MB) + 22IP-17-05_2015_(Mahkamah_Tinggi)_metadata.json
Copied: 22NCVC-127-03_2015_(Mahkamah_Tinggi)_1.pdf (17.8MB) + 22NCVC-127-03_2015_(Mahkamah_Tinggi)_1_metadata.json
Copied: 22NCVC-22-02_2014_(Mahkamah_Tinggi).pdf (15.0MB) + 22NCVC-22-02_2014_(Mahkamah_Tinggi)_metadata.json
Copied: 22NCVC-30-03_2014_(Mahkamah_Tinggi).pdf (13.6MB) + 22NCVC-30-03_2014_(Mahkamah_Tinggi)_metadata.json
Copied: 22_–_1433_–_2010_(Mahkamah_Tinggi)_1.pdf (13.2MB) + 22_–_1433_–_2010_(Mahkamah_Tinggi)_1_metadata.json
Copied: 22NCVC-376-08_2014_(Mahkamah_Tinggi).pdf (12.7MB) + 22NCVC-376-08_2014_(Mahkamah_Tinggi)_metadata.json
Copied: 02(f)-12-03_2018(W)_(Mahkamah_Persekutuan).pdf (11.5MB) + 02(f)-12-03_2018(W)_(Mahkamah_Persekutuan)_metadata.json
Copied: 22NCVC-610-12_2014_(Mahkamah_Tinggi).pdf (11.4MB) + 22NCVC-610-12_2014_(Mahkamah_Tinggi

## 📊 Dataset Creation and Organization

This section creates the RAG (Retrieval-Augmented Generation) dataset by extracting the top 100 legal documents from each of the 16 case category folders. The process involves:

1. **File Size Analysis**: Documents are sorted by file size to prioritize larger, more comprehensive legal documents
2. **Selective Extraction**: Top 100 PDFs from each category to ensure balanced representation across legal domains
3. **Test Set Creation**: 20 randomly selected files are moved to a separate test directory for evaluation
4. **Metadata Preservation**: Both PDF files and their corresponding JSON metadata are moved together

The test dataset is kept separate to prevent data leakage during RAG system evaluation - the LLM should not have access to test answers during training or inference.

In [28]:
import json

# Clean specific keys from all JSON metadata files
keys_to_remove = [
    'related_cases',
    'appeal_type', 
    'serial_number',
    'legislation_referred',
    'other_sources_referred'
]

def clean_json_metadata(json_file_path, keys_to_remove):
    """Remove specific keys from JSON metadata file"""
    try:
        with open(json_file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        # Remove unwanted keys
        removed_keys = []
        for key in keys_to_remove:
            if key in data:
                del data[key]
                removed_keys.append(key)
        
        # Write back cleaned data
        with open(json_file_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        
        return removed_keys
    except Exception as e:
        print(f"Error cleaning {json_file_path}: {e}")
        return []

def clean_all_json_in_directory(metadata_dir, keys_to_remove):
    """Clean all JSON files in a directory"""
    if not os.path.exists(metadata_dir):
        print(f"Directory not found: {metadata_dir}")
        return
    
    cleaned_count = 0
    total_removed = 0
    
    for file in os.listdir(metadata_dir):
        if file.endswith('_metadata.json'):
            file_path = os.path.join(metadata_dir, file)
            removed_keys = clean_json_metadata(file_path, keys_to_remove)
            if removed_keys:
                cleaned_count += 1
                total_removed += len(removed_keys)
                print(f"Cleaned {file}: removed {removed_keys}")
    
    print(f"Cleaned {cleaned_count} files, removed {total_removed} total keys")

# Clean JSON files in RAG dataset
print("=== Cleaning JSON metadata in RAG dataset ===")
clean_all_json_in_directory(rag_legal_cases_metadata_dir, keys_to_remove)

# Clean JSON files in test dataset  
print("\n=== Cleaning JSON metadata in test dataset ===")
clean_all_json_in_directory(test_files_metadata_dir, keys_to_remove)

print("\n✅ All JSON metadata files cleaned!")

=== Cleaning JSON metadata in RAG dataset ===
Cleaned (M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi)_metadata.json: removed ['related_cases', 'appeal_type', 'serial_number', 'legislation_referred', 'other_sources_referred']


KeyboardInterrupt: 

## 🧹 Metadata Cleaning and Optimization

This section removes unnecessary metadata fields to optimize storage and focus on essential legal information. The cleaning process targets specific fields that may be sparse or not directly relevant to the RAG pipeline:

- `related_cases`: Often empty or inconsistent
- `appeal_type`: Not always applicable to all case types
- `serial_number`: Internal court references with limited search value
- `legislation_referred`: Can be extracted from case content when needed
- `other_sources_referred`: Often incomplete or inconsistent

This optimization reduces file sizes and ensures the metadata focuses on the most reliable and useful fields for legal document retrieval.

In [14]:
# Process only RAG and test directories
print("\n=== Processing RAG and Test directories only ===")

# Define only the directories we want to process
directories_to_process = [
    {
        "name": "RAG Dataset",
        "pdf_dir": rag_legal_cases_dir,
        "json_dir": rag_legal_cases_metadata_dir
    },
    # DO NOT PROCESS TEST DATASET OR ELSE LLM WILL SEE THE ANSWERS!
    # {
    #     "name": "Test Dataset", 
    #     "pdf_dir": test_files_dir,
    #     "json_dir": test_files_metadata_dir
    # }
]

# Overall statistics with timing
overall_stats = {
    "folders_processed": 0,
    "total_pdfs_found": 0,
    "total_pdfs_processed": 0,
    "total_json_updated": 0,
    "total_ocr_used": 0,
    "total_direct_extraction": 0,
    "errors": [],
    "successful_matches": [],
    "failed_matches": [],
    "processing_times": [],
    "start_time": time.time()
}

# Initialize single OCR manager at the beginning
print("🔧 Initializing single OCR manager...")
single_ocr_manager.initialize_ocr_once()

# Process each directory with optimizations
for i, dir_info in enumerate(directories_to_process):
    dir_name = dir_info["name"]
    pdf_dir = dir_info["pdf_dir"]
    json_dir = dir_info["json_dir"]
    
    print(f"\n{'='*70}")
    print(f"🔄 Processing {i+1}/2: {dir_name}")
    print(f"   📄 PDF dir: {pdf_dir}")
    print(f"   📝 JSON dir: {json_dir}")
    print(f"{'='*70}")
    
    if not os.path.exists(pdf_dir):
        error_msg = f"PDF directory not found: {pdf_dir}"
        print(f"⚠️ {error_msg}")
        overall_stats["errors"].append(error_msg)
        continue
    
    if not os.path.exists(json_dir):
        error_msg = f"JSON directory not found: {json_dir}"
        print(f"⚠️ {error_msg}")
        overall_stats["errors"].append(error_msg)
        continue
    
    # Get all PDF files in this directory
    pdf_files = [f for f in os.listdir(pdf_dir) if f.endswith('.pdf')]
    overall_stats["total_pdfs_found"] += len(pdf_files)
    
    if not pdf_files:
        print(f"⚠️ No PDF files found in {dir_name}")
        continue
    
    print(f"📄 Found {len(pdf_files)} PDF files")
    
    # Use optimized parallel processing for this folder
    folder_start_time = time.time()
    
    # Process with memory optimization (max 2 workers for single GPU)
    folder_results = parallel_processing_metadata_extraction(pdf_dir, json_dir, max_workers=2)
    
    folder_processing_time = time.time() - folder_start_time
    overall_stats["processing_times"].append({
        "folder": dir_name,
        "time": folder_processing_time,
        "pdf_count": len(pdf_files)
    })
    
    # Aggregate folder results
    folder_processed = sum(1 for r in folder_results if r["success"])
    folder_updated = sum(1 for r in folder_results if r["success"] and r["json_updated"])
    folder_ocr_used = sum(1 for r in folder_results if r.get("extraction_method") == "OCR")
    folder_direct = len(folder_results) - folder_ocr_used
    
    overall_stats["total_pdfs_processed"] += folder_processed
    overall_stats["total_json_updated"] += folder_updated
    overall_stats["total_ocr_used"] += folder_ocr_used
    overall_stats["total_direct_extraction"] += folder_direct
    
    # Collect successful matches and failures
    for result in folder_results:
        if result["success"] and result["json_updated"]:
            overall_stats["successful_matches"].append({
                "folder": dir_name,
                "pdf": result["pdf_file"],
                "base_name": result["metadata_extracted"].get("base_filename", ""),
                "method": result.get("extraction_method", "Unknown")
            })
        elif result["success"] and not result["json_updated"]:
            overall_stats["failed_matches"].append({
                "folder": dir_name,
                "pdf": result["pdf_file"], 
                "reason": result["error"]
            })
        
        if result["error"]:
            overall_stats["errors"].append(f"{dir_name}: {result['error']}")
    
    print(f"\n📊 {dir_name} Summary:")
    print(f"   📄 PDFs processed: {folder_processed}/{len(pdf_files)}")
    print(f"   📝 JSON files updated: {folder_updated}")
    print(f"   🔍 OCR used: {folder_ocr_used} files")
    print(f"   📄 Direct extraction: {folder_direct} files")
    print(f"   ⏱️ Processing time: {folder_processing_time:.2f}s")
    
    overall_stats["folders_processed"] += 1

# Final comprehensive summary
total_processing_time = time.time() - overall_stats["start_time"]

print(f"\n{'='*70}")
print(f"🎉 RAG & TEST PROCESSING COMPLETE")
print(f"{'='*70}")
print(f"📁 Directories processed: {overall_stats['folders_processed']}/2")
print(f"📄 Total PDFs found: {overall_stats['total_pdfs_found']}")
print(f"📄 Total PDFs processed: {overall_stats['total_pdfs_processed']}")
print(f"📝 Total JSON files updated: {overall_stats['total_json_updated']}")
print(f"🔍 Total OCR operations: {overall_stats['total_ocr_used']}")
print(f"📄 Total direct extractions: {overall_stats['total_direct_extraction']}")
print(f"✅ Successful matches: {len(overall_stats['successful_matches'])}")
print(f"❌ Failed matches: {len(overall_stats['failed_matches'])}")
print(f"❌ Total errors: {len(overall_stats['errors'])}")
print(f"⏱️ Total processing time: {total_processing_time:.2f}s")

# Performance statistics
if overall_stats["processing_times"]:
    avg_time_per_folder = sum(t["time"] for t in overall_stats["processing_times"]) / len(overall_stats["processing_times"])
    avg_time_per_pdf = total_processing_time / max(1, overall_stats["total_pdfs_processed"])
    print(f"📈 Average time per directory: {avg_time_per_folder:.2f}s")
    print(f"📈 Average time per PDF: {avg_time_per_pdf:.2f}s")

# Show extraction method breakdown
if overall_stats['total_pdfs_processed'] > 0:
    ocr_percentage = (overall_stats['total_ocr_used'] / overall_stats['total_pdfs_processed']) * 100
    direct_percentage = (overall_stats['total_direct_extraction'] / overall_stats['total_pdfs_processed']) * 100
    print(f"📊 Extraction methods:")
    print(f"   🔍 OCR: {ocr_percentage:.1f}% ({overall_stats['total_ocr_used']} files)")
    print(f"   📄 Direct: {direct_percentage:.1f}% ({overall_stats['total_direct_extraction']} files)")

# Show sample successful matches with methods
if overall_stats["successful_matches"]:
    print(f"\n✅ Sample successful matches:")
    for match in overall_stats["successful_matches"][:5]:
        print(f"   📁 {match['folder']} -> {match['pdf']} ({match['method']})")

# Show failed matches for debugging
if overall_stats["failed_matches"]:
    print(f"\n⚠️ Failed matches (for debugging):")
    for fail in overall_stats["failed_matches"][:5]:
        print(f"   📁 {fail['folder']} -> {fail['pdf']}")
        print(f"      Reason: {fail['reason']}")

# Show errors
if overall_stats["errors"]:
    print(f"\n❌ Errors encountered:")
    for error in overall_stats["errors"][:5]:
        print(f"   - {error}")
    if len(overall_stats["errors"]) > 5:
        print(f"   ... and {len(overall_stats['errors']) - 5} more errors")

print(f"\n✅ Processing complete for RAG and test directories only!")


=== Processing RAG and Test directories only ===
🔧 Initializing single OCR manager...
🔧 Initializing single OCR reader (GPU memory optimized)...
✅ OCR reader initialized successfully

🔄 Processing 1/2: RAG Dataset
   📄 PDF dir: ../../../../data/raw/legal_cases/rag_legal_case_files
   📝 JSON dir: ../../../../data/raw/legal_cases/rag_legal_case_files\metadata
📄 Found 1580 PDF files
📄 Found 1580 PDF files
🚀 Processing with 2 workers (Single GPU OCR)...
   🔍 Processing PDF: (M1)_22-203-2006,_(M3)_22-45-2008_(Mahkamah_Tinggi).pdf
   📝 Base name: (M1)_22-203-2006,_(M3)_22-45-2008_(Mahkamah_Tinggi)
   🔍 Processing PDF: (M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi).pdf
   📝 Base name: (M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi)
Successfully imported LlamaIndex
⚠️ Low text content (0 chars), trying OCR...
📄 Converting PDF to images: (M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi).pdf
🔍 OCR processing (GPU): (M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi).pdf
   📄 Processing p

## 🔄 Batch Processing Pipeline

This section implements the main processing pipeline that applies hybrid text extraction to all documents in the RAG dataset. The pipeline includes:

### Key Features:
- **Memory-Optimized Processing**: Uses a single OCR manager instance to prevent GPU memory issues
- **Parallel Processing**: Multi-threaded processing with controlled worker limits
- **Comprehensive Statistics**: Tracks processing times, success rates, and error handling
- **Progress Monitoring**: Real-time feedback on processing status

### Processing Strategy:
1. **RAG Dataset Only**: Processes only the curated RAG dataset, avoiding test files to prevent data leakage
2. **Hybrid Extraction**: Combines PyMuPDF4LLM and OCR for optimal text quality
3. **Error Recovery**: Graceful handling of corrupted or unprocessable files
4. **Performance Tracking**: Detailed timing and success metrics

**Note**: Test dataset is intentionally excluded from processing to maintain evaluation integrity.

In [15]:
# Perform ocr and extract metadata manually on the error file

# Path to the error file and its JSON
error_file = "../../../../data/raw/legal_cases/rag_legal_case_files/22NCVC-640-10_2016_(Mahkamah_Tinggi).pdf"
error_file_json = "../../../../data/raw/legal_cases/rag_legal_case_files/metadata/22NCVC-640-10_2016_(Mahkamah_Tinggi)_metadata.json"

# Perform OCR
text = single_ocr_manager.perform_ocr_safe(error_file)
ocr_text = "\n\n".join(["\n".join(page_text_list) for page_text_list in text if page_text_list])
print(ocr_text)

📄 Converting PDF to images: 22NCVC-640-10_2016_(Mahkamah_Tinggi).pdf
🔍 OCR processing (GPU): 22NCVC-640-10_2016_(Mahkamah_Tinggi).pdf
   📄 Processing page 1/54
   📄 Processing page 2/54
   📄 Processing page 3/54
   📄 Processing page 4/54
   📄 Processing page 5/54
   📄 Processing page 6/54
   📄 Processing page 7/54
   📄 Processing page 8/54
   📄 Processing page 9/54
   📄 Processing page 10/54
   📄 Processing page 11/54
   📄 Processing page 12/54
   📄 Processing page 13/54
   📄 Processing page 14/54
   📄 Processing page 15/54
   📄 Processing page 16/54
   📄 Processing page 17/54
   📄 Processing page 18/54
   📄 Processing page 19/54
   📄 Processing page 20/54
   📄 Processing page 21/54
   📄 Processing page 22/54
   📄 Processing page 23/54
   📄 Processing page 24/54
   📄 Processing page 25/54
   📄 Processing page 26/54
   📄 Processing page 27/54
   📄 Processing page 28/54
   📄 Processing page 29/54
   📄 Processing page 30/54
   📄 Processing page 31/54
   📄 Processing page 32/54
   📄 Proces

In [20]:
# Perform metadata extraction
import pprint

extracted_metadata = extract_metadata_using_regex(ocr_text)
pprint.pp(extracted_metadata)

# Metadata extracted result wasn't satisfactory, need to extract manually 

2166-02-15
{'case_number': '_WA-22NCVC-640-10/201\n'
                'ANTARA\n'
                'EASY REGION ENGINEERING SDN BHD (No',
 'court': 'MAHKAMAH TINGGI',
 'appeal_type': 'Unknown',
 'serial_number': None,
 'petitioners': [],
 'respondents': [],
 'coram': [],
 'related_cases': [],
 'legislation_referred': [],
 'other_sources_referred': []}


In [16]:
# Load JSON from file
with open(error_file_json, "r", encoding="utf-8") as f:
    data = json.load(f)
    
data["text"] = ocr_text

# Save back to the file
with open(error_file_json, "w", encoding="utf-8") as f:
    json.dump(data, f, indent=4, ensure_ascii=False)
    
print("JSON updated successfully!")

JSON updated successfully!


In [None]:
# Archived - as too much files to process in one go
# Minimized to only 1600 files to processed

# # Batch update multiple directories
# print("\n=== Batch process all PDF folders ===")

# pdf_directories = [
#     f"../../../../data/raw/legal_cases/legal_case_folder{i}/" 
#     for i in range(1, 17)
# ]

# json_directories = [
#     f"../../../../data/raw/legal_cases/legal_case_folder{i}/metadata_folder" 
#     for i in range(1, 17)
# ]

# # Overall statistics with timing
# overall_stats = {
#     "folders_processed": 0,
#     "total_pdfs_found": 0,
#     "total_pdfs_processed": 0,
#     "total_json_updated": 0,
#     "total_ocr_used": 0,
#     "total_direct_extraction": 0,
#     "errors": [],
#     "successful_matches": [],
#     "failed_matches": [],
#     "processing_times": [],
#     "start_time": time.time()
# }

# # Initialize single OCR manager at the beginning
# print("🔧 Initializing single OCR manager for all folders...")
# single_ocr_manager.initialize_ocr_once()

# # Process each directory with optimizations
# for i, (pdf_dir, json_dir) in enumerate(zip(pdf_directories, json_directories)):
#     folder_num = i + 1
#     print(f"\n{'='*70}")
#     print(f"🔄 Processing folder {folder_num}/16: {os.path.basename(pdf_dir)}")
#     print(f"{'='*70}")
    
#     if not os.path.exists(pdf_dir):
#         error_msg = f"PDF directory not found: {pdf_dir}"
#         print(f"⚠️ {error_msg}")
#         overall_stats["errors"].append(error_msg)
#         continue
    
#     if not os.path.exists(json_dir):
#         error_msg = f"JSON directory not found: {json_dir}"
#         print(f"⚠️ {error_msg}")
#         overall_stats["errors"].append(error_msg)
#         continue
    
#     # Get all PDF files in this directory
#     pdf_files = [f for f in os.listdir(pdf_dir) if f.endswith('.pdf')]
#     overall_stats["total_pdfs_found"] += len(pdf_files)
    
#     if not pdf_files:
#         print(f"⚠️ No PDF files found in {os.path.basename(pdf_dir)}")
#         continue
    
#     print(f"📄 Found {len(pdf_files)} PDF files")
#     print(f"📁 JSON directory: {os.path.basename(json_dir)}")
    
#     # Use optimized parallel processing for this folder
#     folder_start_time = time.time()
    
#     # Process with memory optimization (max 2 workers for single GPU)
#     folder_results = parallel_processing_metadata_extraction(pdf_dir, json_dir, max_workers=2)
    
#     folder_processing_time = time.time() - folder_start_time
#     overall_stats["processing_times"].append({
#         "folder": os.path.basename(pdf_dir),
#         "time": folder_processing_time,
#         "pdf_count": len(pdf_files)
#     })
    
#     # Aggregate folder results
#     folder_processed = sum(1 for r in folder_results if r["success"])
#     folder_updated = sum(1 for r in folder_results if r["success"] and r["json_updated"])
#     folder_ocr_used = sum(1 for r in folder_results if r.get("extraction_method") == "OCR")
#     folder_direct = len(folder_results) - folder_ocr_used
    
#     overall_stats["total_pdfs_processed"] += folder_processed
#     overall_stats["total_json_updated"] += folder_updated
#     overall_stats["total_ocr_used"] += folder_ocr_used
#     overall_stats["total_direct_extraction"] += folder_direct
    
#     # Collect successful matches and failures
#     for result in folder_results:
#         if result["success"] and result["json_updated"]:
#             overall_stats["successful_matches"].append({
#                 "folder": os.path.basename(pdf_dir),
#                 "pdf": result["pdf_file"],
#                 "base_name": result["metadata_extracted"].get("base_filename", ""),
#                 "method": result.get("extraction_method", "Unknown")
#             })
#         elif result["success"] and not result["json_updated"]:
#             overall_stats["failed_matches"].append({
#                 "folder": os.path.basename(pdf_dir),
#                 "pdf": result["pdf_file"], 
#                 "reason": result["error"]
#             })
        
#         if result["error"]:
#             overall_stats["errors"].append(f"Folder {folder_num}: {result['error']}")
    
#     print(f"\n📊 Folder {folder_num} Summary:")
#     print(f"   📄 PDFs processed: {folder_processed}/{len(pdf_files)}")
#     print(f"   📝 JSON files updated: {folder_updated}")
#     print(f"   🔍 OCR used: {folder_ocr_used} files")
#     print(f"   📄 Direct extraction: {folder_direct} files")
#     print(f"   ⏱️ Processing time: {folder_processing_time:.2f}s")
    
#     overall_stats["folders_processed"] += 1

# # Final comprehensive summary
# total_processing_time = time.time() - overall_stats["start_time"]

# print(f"\n{'='*70}")
# print(f"🎉 OPTIMIZED BATCH PROCESSING COMPLETE")
# print(f"{'='*70}")
# print(f"📁 Folders processed: {overall_stats['folders_processed']}/16")
# print(f"📄 Total PDFs found: {overall_stats['total_pdfs_found']}")
# print(f"📄 Total PDFs processed: {overall_stats['total_pdfs_processed']}")
# print(f"📝 Total JSON files updated: {overall_stats['total_json_updated']}")
# print(f"🔍 Total OCR operations: {overall_stats['total_ocr_used']}")
# print(f"📄 Total direct extractions: {overall_stats['total_direct_extraction']}")
# print(f"✅ Successful matches: {len(overall_stats['successful_matches'])}")
# print(f"❌ Failed matches: {len(overall_stats['failed_matches'])}")
# print(f"❌ Total errors: {len(overall_stats['errors'])}")
# print(f"⏱️ Total processing time: {total_processing_time:.2f}s")

# # Performance statistics
# if overall_stats["processing_times"]:
#     avg_time_per_folder = sum(t["time"] for t in overall_stats["processing_times"]) / len(overall_stats["processing_times"])
#     avg_time_per_pdf = total_processing_time / max(1, overall_stats["total_pdfs_processed"])
#     print(f"📈 Average time per folder: {avg_time_per_folder:.2f}s")
#     print(f"📈 Average time per PDF: {avg_time_per_pdf:.2f}s")

# # Show extraction method breakdown
# ocr_percentage = (overall_stats['total_ocr_used'] / max(1, overall_stats['total_pdfs_processed'])) * 100
# direct_percentage = (overall_stats['total_direct_extraction'] / max(1, overall_stats['total_pdfs_processed'])) * 100
# print(f"📊 Extraction methods:")
# print(f"   🔍 OCR: {ocr_percentage:.1f}% ({overall_stats['total_ocr_used']} files)")
# print(f"   📄 Direct: {direct_percentage:.1f}% ({overall_stats['total_direct_extraction']} files)")

# # Show sample successful matches with methods
# if overall_stats["successful_matches"]:
#     print(f"\n✅ Sample successful matches:")
#     for match in overall_stats["successful_matches"][:5]:
#         print(f"   📁 {match['folder']} -> {match['pdf']} ({match['method']})")

# # Show failed matches for debugging
# if overall_stats["failed_matches"]:
#     print(f"\n⚠️ Failed matches (for debugging):")
#     for fail in overall_stats["failed_matches"][:5]:
#         print(f"   📁 {fail['folder']} -> {fail['pdf']}")
#         print(f"      Reason: {fail['reason']}")

# # Show errors
# if overall_stats["errors"]:
#     print(f"\n❌ Errors encountered:")
#     for error in overall_stats["errors"][:5]:
#         print(f"   - {error}")
#     if len(overall_stats["errors"]) > 5:
#         print(f"   ... and {len(overall_stats['errors']) - 5} more errors")

# # Memory optimization summary
# print(f"\n💾 Memory Optimization Summary:")
# print(f"   🔧 Single GPU OCR instance used")
# print(f"   🔄 Max 2 parallel workers per folder")
# print(f"   📈 Thread-safe processing with locks")
# print(f"   🧹 Automatic memory cleanup after each PDF")


=== Batch process all PDF folders ===
🔧 Initializing single OCR manager for all folders...
🔧 Initializing single OCR reader (GPU memory optimized)...
✅ OCR reader initialized successfully

🔄 Processing folder 1/16: 
📄 Found 1000 PDF files
📁 JSON directory: metadata_folder
📄 Found 1000 PDF files
🚀 Processing with 2 workers (Single GPU OCR)...
   🔍 Processing PDF: (M1)_22-203-2006,_(M3)_22-45-2008_(Mahkamah_Tinggi).pdf
   📝 Base name: (M1)_22-203-2006,_(M3)_22-45-2008_(Mahkamah_Tinggi)
   🔍 Processing PDF: (M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi).pdf
   📝 Base name: (M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi)
⚠️ Low text content (0 chars), trying OCR...
📄 Converting PDF to images: (M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi).pdf
🔍 OCR processing (GPU): (M1)_22-203-2006_(M3)_22-45-2008_(Mahkamah_Tinggi).pdf
   📄 Processing page 1/10
✅ Direct PDF extraction successful
   ✅ Metadata extracted:
      📋 Case: (M1) 22-203-2006
      🏛️ Court: HIGH COURT IN MALAYA AT IPO

In [None]:
# 🤖 Advanced Metadata Extraction with Google Gemini AI
# This section implements intelligent legal document analysis using Google's Gemini 2.5 Flash

import dotenv
from google import genai
from google.genai import types
import pathlib
from typing import List, Literal, Optional
from pydantic import BaseModel
import os
import json
from json_repair import repair_json

# Load environment variables for API access
dotenv.load_dotenv()

# 📁 Configure paths for test document processing
test_files_dir = "../../../../data/raw/legal_cases/test_files"
test_files_metadata_dir = os.path.join(test_files_dir, "metadata")

# 🔑 Initialize Google Gemini API client
api_key = os.getenv("GOOGLE_API_KEY")

# 📊 PYDANTIC SCHEMA DEFINITIONS FOR LEGAL METADATA
# These models ensure consistent, structured extraction of legal information

class DamagesAwarded(BaseModel):
    """
    Structured representation of monetary damages in legal cases.
    
    Attributes:
        type: Category of damages awarded (None, Nominal, Compensatory, etc.)
        amount: Monetary value if applicable
        currency: Currency code (default: Malaysian Ringgit)
    """
    type: Literal["None", "Nominal", "Compensatory", "Punitive", "Aggravated", "Exemplary"]
    amount: Optional[float] = None
    currency: Optional[str] = "MYR"

class Remedy(BaseModel):
    """
    Legal remedies and relief granted by the court.
    
    Attributes:
        damages_awarded: Monetary compensation details
        injunction: Whether injunctive relief was granted
        declaratory_relief: Whether declaratory relief was provided
        specific_performance: Whether specific performance was ordered
        costs_awarded: Who bears the legal costs
    """
    damages_awarded: DamagesAwarded
    injunction: bool
    declaratory_relief: bool
    specific_performance: bool
    costs_awarded: Optional[Literal["Plaintiff", "Defendant", "Each party bears own costs"]] = None

class Outcome(BaseModel):
    """
    Final case outcome and judgment details.
    
    Attributes:
        disposition: Who prevailed in the case
        judgment_type: Type of legal judgment rendered
        remedy: Specific remedies granted
        appeal_possibility: Whether the case can be appealed
    """
    disposition: Literal[
        "Plaintiff wins",
        "Defendant wins", 
        "Partially in favour of Plaintiff",
        "Partially in favour of Defendant",
        "Case dismissed",
        "Withdrawn",
        "Settled out of court",
        "Struck out"
    ]
    judgment_type: Literal[
        "Trial Judgment",
        "Summary Judgment",
        "Consent Judgment",
        "Default Judgment",
        "Appeal Allowed",
        "Appeal Dismissed"
    ]
    remedy: Remedy
    appeal_possibility: Literal["Yes", "No"]

class CaseSummary(BaseModel):
    """
    Complete structured summary of a legal case.
    
    Attributes:
        summarized_documents: Concise summary of legal proceedings
        key_points: List of important legal takeaways
        outcome: Structured judgment and remedy information
    """
    summarized_documents: str
    key_points: List[str]
    outcome: Outcome

def extract_json(text: str):
    """
    Robust JSON extraction from AI model responses.
    
    Handles common formatting issues like code blocks and attempts
    automatic repair of malformed JSON using json_repair library.
    
    Args:
        text: Raw text response from AI model
        
    Returns:
        dict: Parsed JSON object
        
    Raises:
        json.JSONDecodeError: If JSON cannot be parsed or repaired
    """
    # Remove markdown code block formatting if present
    if text.startswith("```"):
        text = text.strip().strip("`")
        if text.lower().startswith("json"):
            text = text[4:].strip()
    
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        # Attempt automatic JSON repair for common formatting issues
        fixed = repair_json(text)
        return json.loads(fixed)

# 🔗 Initialize Google Gemini client with API key
client = genai.Client(api_key=api_key)

# 📝 COMPREHENSIVE LEGAL ANALYSIS PROMPT
# This prompt guides the AI to extract structured legal information following our schema
prompt = """
You are an expert legal assistant. Read the provided legal documents carefully and return a JSON object that follows exactly the schema and label constraints below.

Schema Requirements:

- summarized_documents: A concise summary of the key legal arguments, facts, and proceedings.

- key_points: A bulleted list of the most important takeaways, including legal precedents, significant dates, and parties involved.

- outcome: A structured representation of the final judgment, using only the allowed labels below.

Allowed Labels:
- disposition (choose one):
    - "Plaintiff wins"
    - "Defendant wins"
    - "Partially in favour of Plaintiff"
    - "Partially in favour of Defendant"
    - "Case dismissed"
    - "Withdrawn"
    - "Settled out of court"
    - "Struck out"
- judgment_type (choose one):
    - "Trial Judgment"
    - "Summary Judgment"
    - "Consent Judgment"
    - "Default Judgment"
    - "Appeal Allowed"
    - "Appeal Dismissed"
- remedy.damages_awarded.type (choose one):
    - "None"
    - "Nominal"
    - "Compensatory"
    - "Punitive"
    - "Aggravated"
    - "Exemplary"
- remedy.injunction: true or false
- remedy.declaratory_relief: true or false
- remedy.specific_performance: true or false
- remedy.costs_awarded (choose one):
    - "Plaintiff"
    - "Defendant"
    - "Each party bears own costs"
- appeal_possibility (choose one):
    - "Yes"
    - "No"

Output strictly in JSON format with no additional commentary.

Example JSON format:
{
  "summarized_documents": "This case concerns a contract dispute between ABC Sdn Bhd and XYZ Holdings. The plaintiff argued breach of payment terms, while the defendant raised a counterclaim for defective goods. The court reviewed documentary evidence and witness testimony.",
  "key_points": [
    "Contract dispute regarding payment obligations.",
    "Plaintiff: ABC Sdn Bhd; Defendant: XYZ Holdings.",
    "Court found sufficient evidence of breach by defendant."
  ],
  "outcome": {
    "disposition": "Plaintiff wins",
    "judgment_type": "Trial Judgment",
    "remedy": {
      "damages_awarded": {
        "type": "Compensatory",
        "amount": 250000.00,
        "currency": "MYR"
      },
      "injunction": false,
      "declaratory_relief": true,
      "specific_performance": false,
      "costs_awarded": "Plaintiff"
    },
    "appeal_possibility": "Yes"
  }
}
"""

## Metadata Extraction with Google Gemini

This section implements advanced legal document analysis using Google's Gemini 2.5 Flash model. The AI system processes the 20 randomly selected test files to extract structured legal metadata for evaluation purposes.

### Key Components:

#### 🔍 **Pydantic Schema Models**
- **`DamagesAwarded`**: Structured representation of monetary damages and compensation
- **`Remedy`**: Legal remedies including injunctions, declaratory relief, and cost awards
- **`Outcome`**: Complete case disposition with judgment types and appeal possibilities
- **`CaseSummary`**: Comprehensive case analysis with summaries, key points, and outcomes

#### 🎯 **AI Processing Pipeline**
1. **Test File Selection**: Processes the 20 randomly extracted test files for evaluation
2. **Structured Extraction**: Uses Pydantic models to ensure consistent metadata format
3. **JSON Validation**: Robust parsing and repair of AI-generated JSON responses
4. **Legal Intelligence**: Advanced prompting for accurate legal domain understanding

#### 🛡️ **Quality Assurance**
- **Schema Enforcement**: Strict typing ensures data consistency
- **Error Recovery**: Automatic JSON repair for malformed responses
- **Domain Expertise**: Legal-specific prompting for accurate case analysis

**Purpose**: The extracted metadata serves as ground truth for evaluating the RAG system's ability to accurately retrieve and understand legal case information.

---

## Advanced Metadata Extraction with LLM Analysis

This section implements advanced metadata extraction using Google's Gemini 2.5 Flash model for intelligent analysis of legal documents. The system combines traditional text extraction with AI-powered understanding to generate structured legal metadata.

### Key Features:

- **Structured Data Extraction**: Uses Pydantic models to ensure consistent metadata format
- **Legal-Specific Schema**: Tailored for legal case analysis with proper categorization
- **Robust Error Handling**: JSON repair and validation to handle parsing issues
- **Batch Processing**: Efficient processing of multiple documents with progress tracking
- **Quality Validation**: Schema validation ensures data integrity

### Workflow:

1. **Document Analysis**: AI model reads PDF content and extracts legal information
2. **Structured Output**: Generates JSON following predefined legal schema
3. **Validation**: Pydantic models validate and clean the extracted data
4. **Storage**: Saves metadata alongside original documents for ingestion pipeline

This approach significantly improves the quality and consistency of metadata extraction compared to regex-based methods.

In [None]:
# 🔄 Process Test Files with Gemini AI Analysis
# Process all 20 randomly selected test files to extract structured legal metadata

# Loop through all PDFs in test_files_dir
for pdf_file in os.listdir(test_files_dir):
    if not pdf_file.endswith(".pdf"):
        continue

    pdf_path = pathlib.Path(os.path.join(test_files_dir, pdf_file))
    json_filename = pdf_file.replace(".pdf", "_metadata.json")
    json_path = os.path.join(test_files_metadata_dir, json_filename)

    print(f"🔍 Processing {pdf_file}")

    try:
        # 🤖 Send PDF to Gemini AI for intelligent analysis
        response = client.models.generate_content(
            model="gemini-2.5-flash",
            contents=[
                types.Part.from_bytes(
                    data=pdf_path.read_bytes(),
                    mime_type="application/pdf",
                ),
                prompt
            ],
            config=types.GenerateContentConfig(
                temperature=0.0  # Deterministic output for consistent results
            )
        )

        # 📊 Extract and validate JSON response
        new_metadata = extract_json(response.text)
        
        # ✅ Validate against Pydantic schema for data integrity
        validated = CaseSummary.model_validate(new_metadata)
        new_metadata = validated.model_dump()

        # 📁 Load existing metadata if present
        if os.path.exists(json_path):
            with open(json_path, "r", encoding="utf-8") as f:
                metadata = json.load(f)
        else:
            metadata = {}

        # 🎯 Store AI-generated metadata as ground truth for evaluation
        metadata["ground_truth_judgment"] = new_metadata

        # 💾 Save updated metadata
        with open(json_path, "w", encoding="utf-8") as f:
            json.dump(metadata, f, indent=2, ensure_ascii=False)

        print(f"✅ Updated {json_filename}")

    except Exception as e:
        print(f"❌ Failed {pdf_file}: {e}")

🔍 Processing 12A-983-2010_(Mahkamah_Tinggi).pdf
✅ Updated 12A-983-2010_(Mahkamah_Tinggi)_metadata.json
🔍 Processing AA-A52NCvC-210-12_2019_(Mahkamah_Sesyen).pdf
✅ Updated AA-A52NCvC-210-12_2019_(Mahkamah_Sesyen)_metadata.json
🔍 Processing AA-A53KJ-499-12_2019_(Mahkamah_Sesyen).pdf
✅ Updated AA-A53KJ-499-12_2019_(Mahkamah_Sesyen)_metadata.json
🔍 Processing BA-12B-125-07_2017_(Mahkamah_Tinggi).pdf
✅ Updated BA-12B-125-07_2017_(Mahkamah_Tinggi)_metadata.json
🔍 Processing BA-22NCVC-290-07_2019_(Mahkamah_Tinggi).pdf
✅ Updated BA-22NCVC-290-07_2019_(Mahkamah_Tinggi)_metadata.json
🔍 Processing BA-22NCvC-384-06_2017_(Mahkamah_Tinggi).pdf
✅ Updated BA-22NCvC-384-06_2017_(Mahkamah_Tinggi)_metadata.json
🔍 Processing BA-22NCvC-649-11_2017_(Mahkamah_Tinggi).pdf
✅ Updated BA-22NCvC-649-11_2017_(Mahkamah_Tinggi)_metadata.json
🔍 Processing CA-12B-2-01_2021_(Mahkamah_Tinggi).pdf
✅ Updated CA-12B-2-01_2021_(Mahkamah_Tinggi)_metadata.json
🔍 Processing CB-24NCvC-79-09_2021_(Mahkamah_Tinggi).pdf
❌ Failed C

In [None]:
# rerun as there are Requests per minute (RPM) and Tokens per minute (input) (TPM)
failed_files = [
    #"CB-24NCvC-79-09_2021_(Mahkamah_Tinggi).pdf",
    #"PA-12BNCvC-14-03_2020_(Mahkamah_Tinggi).pdf",
    "WA-22IP-57-08_2019_(Mahkamah_Tinggi).pdf",
    "WA-22NCvC-126-03_2022_(Mahkamah_Tinggi).pdf",
    "WA-22NCvC-143-03_2023_(Mahkamah_Tinggi)_12.pdf",
    "wa-24c-218-10_2019_&_wa-24c-241-11_2019_(Mahkamah_Tinggi).pdf",
    "WA-24NCvC-4216-09_2023_(Mahkamah_Tinggi).pdf"
]

for pdf_file in failed_files:
    pdf_path = pathlib.Path(os.path.join(test_files_dir, pdf_file))
    json_filename = pdf_file.replace(".pdf", "_metadata.json")
    json_path = os.path.join(test_files_metadata_dir, json_filename)

    print(f"🔁 Retrying {pdf_file}")

    try:
        response = client.models.generate_content(
            model="gemini-2.5-flash",
            contents=[
                types.Part.from_bytes(
                    data=pdf_path.read_bytes(),
                    mime_type="application/pdf",
                ),
                prompt
            ],
            config=types.GenerateContentConfig(
                temperature=0.0
            )
        )

        new_metadata = extract_json(response.text)
        validated = CaseSummary.model_validate(new_metadata)
        new_metadata = validated.model_dump()

        if os.path.exists(json_path):
            with open(json_path, "r", encoding="utf-8") as f:
                metadata = json.load(f)
        else:
            metadata = {}

        metadata["ground_truth_judgment"] = new_metadata

        with open(json_path, "w", encoding="utf-8") as f:
            json.dump(metadata, f, indent=2, ensure_ascii=False)

        print(f"✅ Updated {json_filename}")

    except Exception as e:
        print(f"❌ Still failed {pdf_file}: {e}")

🔁 Retrying WA-22IP-57-08_2019_(Mahkamah_Tinggi).pdf
✅ Updated WA-22IP-57-08_2019_(Mahkamah_Tinggi)_metadata.json
🔁 Retrying WA-22NCvC-126-03_2022_(Mahkamah_Tinggi).pdf
✅ Updated WA-22NCvC-126-03_2022_(Mahkamah_Tinggi)_metadata.json
🔁 Retrying WA-22NCvC-143-03_2023_(Mahkamah_Tinggi)_12.pdf
✅ Updated WA-22NCvC-143-03_2023_(Mahkamah_Tinggi)_12_metadata.json
🔁 Retrying wa-24c-218-10_2019_&_wa-24c-241-11_2019_(Mahkamah_Tinggi).pdf
❌ Still failed wa-24c-218-10_2019_&_wa-24c-241-11_2019_(Mahkamah_Tinggi).pdf: Expecting ',' delimiter: line 2 column 1042 (char 1043)
🔁 Retrying WA-24NCvC-4216-09_2023_(Mahkamah_Tinggi).pdf
✅ Updated WA-24NCvC-4216-09_2023_(Mahkamah_Tinggi)_metadata.json


In [None]:
# try with validation again first
# if doesn't work, try without validation
files_without_validation = [
    "CB-24NCvC-79-09_2021_(Mahkamah_Tinggi).pdf",
    #"PA-12BNCvC-14-03_2020_(Mahkamah_Tinggi).pdf",
    #"wa-24c-218-10_2019_&_wa-24c-241-11_2019_(Mahkamah_Tinggi).pdf"
]

for pdf_file in files_without_validation:
    pdf_path = pathlib.Path(os.path.join(test_files_dir, pdf_file))
    json_filename = pdf_file.replace(".pdf", "_metadata.json")
    json_path = os.path.join(test_files_metadata_dir, json_filename)

    print(f"🔁 Retrying {pdf_file}")

    try:
        response = client.models.generate_content(
            model="gemini-2.5-flash",
            contents=[
                types.Part.from_bytes(
                    data=pdf_path.read_bytes(),
                    mime_type="application/pdf",
                ),
                prompt
            ],
            config=types.GenerateContentConfig(
                temperature=0.0
            )
        )

        new_metadata = extract_json(response.text)
        # validated = CaseSummary.model_validate(new_metadata)
        # new_metadata = validated.model_dump()

        if os.path.exists(json_path):
            with open(json_path, "r", encoding="utf-8") as f:
                metadata = json.load(f)
        else:
            metadata = {}

        metadata["ground_truth_judgment"] = new_metadata

        with open(json_path, "w", encoding="utf-8") as f:
            json.dump(metadata, f, indent=2, ensure_ascii=False)

        print(f"✅ Updated {json_filename}")

    except Exception as e:
        print(f"❌ Still failed {pdf_file}: {e}")

🔁 Retrying CB-24NCvC-79-09_2021_(Mahkamah_Tinggi).pdf
✅ Updated CB-24NCvC-79-09_2021_(Mahkamah_Tinggi)_metadata.json


In [None]:
# 🎯 Final Processing Pipeline Summary
# Generate comprehensive statistics and finalize the preprocessing pipeline

print("="*70)
print("🎉 PREPROCESSING PIPELINE COMPLETE")
print("="*70)

# 📊 Summary of completed tasks
completed_tasks = [
    "✅ Dataset Creation: Extracted top 100 files from 16 legal case categories",
    "✅ Test Set Separation: 20 random files isolated for evaluation",
    "✅ Metadata Cleaning: Removed unnecessary fields for optimization", 
    "✅ Hybrid Text Extraction: Applied PyMuPDF4LLM + OCR to RAG dataset",
    "✅ AI Analysis: Gemini-powered structured metadata extraction for test cases",
    "✅ Document Serialization: LlamaIndex Document format ready for ingestion",
    "✅ Quality Validation: Pydantic schema enforcement for data consistency"
]

for task in completed_tasks:
    print(task)

print(f"\n📁 Output Directories:")
print(f"   🗂️ RAG Dataset: {rag_legal_cases_dir}")
print(f"   🗂️ RAG Metadata: {rag_legal_cases_metadata_dir}")
print(f"   🧪 Test Files: {test_files_dir}")
print(f"   📝 Test Metadata: {test_files_metadata_dir}")
print(f"   💾 Processed Documents: {save_dir}")

print(f"\n🔍 Next Steps:")
print(f"   1. Run ingestion_pipeline.ipynb to create vector embeddings")
print(f"   2. Initialize RAG system with processed documents")
print(f"   3. Evaluate system performance using test cases with ground truth")
print(f"   4. Deploy legal assistant with optimized retrieval")

print(f"\n🏁 Preprocessing pipeline ready for production deployment!")
print("="*70)