
"""
# 📝 Information Extraction - Vehicle Document Intelligence
## Advanced OCR Pipeline for Text Detection and Recognition

**System Architecture**:
Existing Classification Model (93% accuracy) → Document Type
         ↓
YOLO v9 Detection → [license_plate, odometer, text_regions, damage]
         ↓  
Specialized OCR → Text extraction per detected region
         ↓
Post-processing → Validation, formatting, confidence scoring

### Extraction Capabilities:
1. **License Plate Recognition**: Alphanumeric characters from plates
2. **Odometer Reading**: Digital/analog odometer values
3. **Document Text**: Key information from papers/documents
4. **Damage Detection**: Visual anomalies and damage indicators
5. **Quality Scoring**: Confidence and reliability metrics

### Technical Stack:
- **Detection**: YOLOv9/YOLOv8 for object localization
- **OCR**: EasyOCR + PaddleOCR for robust text recognition
- **Validation**: RegEx patterns + business rules
- **Integration**: Seamless pipeline with lassification model
"""


In [1]:
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
import json
import re
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
import warnings
warnings.filterwarnings('ignore')

# OCR and Detection libraries
try:
    import easyocr
    EASYOCR_AVAILABLE = True
    print("✅ EasyOCR available")
except ImportError:
    EASYOCR_AVAILABLE = False
    print("⚠️ EasyOCR not available - install with: pip install easyocr")

try:
    from ultralytics import YOLO
    YOLO_AVAILABLE = True
    print("✅ YOLO available")
except ImportError:
    YOLO_AVAILABLE = False
    print("⚠️ YOLO not available - install with: pip install ultralytics")

try:
    import paddleocr
    PADDLE_AVAILABLE = True
    print("✅ PaddleOCR available")
except ImportError:
    PADDLE_AVAILABLE = False
    print("⚠️ PaddleOCR not available - install with: pip install paddlepaddle paddleocr")

# Image processing
from sklearn.metrics import accuracy_score
import pytesseract
from scipy import ndimage
from skimage import measure, morphology

# Configure visualization
plt.style.use('default')
sns.set_palette("Set1")
plt.rcParams['figure.figsize'] = (15, 10)
plt.rcParams['font.size'] = 11

print("🔧 Information Extraction Environment Setup Complete")

2025-07-23 15:43:56.641047: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-07-23 15:43:56.641107: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-07-23 15:43:56.642512: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-07-23 15:43:56.648786: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


✅ EasyOCR available
Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/home/edwlearn/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
✅ YOLO available
✅ PaddleOCR available
🔧 Information Extraction Environment Setup Complete


In [2]:
# Project paths
PROJECT_ROOT = Path("..")
DATA_DIR = PROJECT_ROOT / "data" / "processed" / "car_plates"
MODELS_DIR = PROJECT_ROOT / "models"
EXTRACTION_DIR = MODELS_DIR / "extraction"
ANNOTATIONS_DIR = DATA_DIR / "annotations"

# Create extraction directory
EXTRACTION_DIR.mkdir(exist_ok=True)

# Classification model configuration (from previous work)
CLASS_NAMES = ['document', 'licence', 'odometer']
CLASS_TO_INT = {'document': 0, 'licence': 1, 'odometer': 2}
INT_TO_CLASS = {0: 'document', 1: 'licence', 2: 'odometer'}

# OCR and Detection configuration
DETECTION_CONFIDENCE = 0.5
OCR_CONFIDENCE_THRESHOLD = 0.6
IMG_SIZE = 224

# Text patterns for validation
TEXT_PATTERNS = {
    'license_plate': {
        'patterns': [
            r'^[A-Z]{3}-\d{3}$',  # ABC-123 format
            r'^[A-Z]{2}\d{4}$',   # AB1234 format
            r'^\d{3}[A-Z]{3}$',   # 123ABC format
            r'^[A-Z]{1,3}\d{1,4}[A-Z]?$'  # General format
        ],
        'min_length': 5,
        'max_length': 10
    },
    'odometer': {
        'patterns': [
            r'^\d{1,6}$',         # Pure numbers
            r'^\d{1,3},\d{3}$',   # With comma separator
            r'^\d{1,3}\.\d{3}$',  # With dot separator
            r'^\d{1,6}\s*(km|mi|KM|MI)?$'  # With units
        ],
        'min_value': 0,
        'max_value': 999999
    },
    'document_text': {
        'min_length': 2,
        'max_length': 100,
        'allowed_chars': r'[A-Za-z0-9\s\-\.,:]'
    }
}

print("📁 Information Extraction Configuration:")
print(f"Extraction Directory: {EXTRACTION_DIR}")
print(f"Detection Confidence: {DETECTION_CONFIDENCE}")
print(f"OCR Confidence: {OCR_CONFIDENCE_THRESHOLD}")
print(f"Available OCR engines: {sum([EASYOCR_AVAILABLE, PADDLE_AVAILABLE])}")

📁 Information Extraction Configuration:
Extraction Directory: ../models/extraction
Detection Confidence: 0.5
OCR Confidence: 0.6
Available OCR engines: 2


In [3]:
print("🔄 Loading YOUR trained classification model...")

# Load YOUR existing model (the one with 93% accuracy)
def load_your_classification_model():
    """Load YOUR existing trained classification model"""
    
    # Try to load YOUR model from the most likely locations
    model_paths = [
        MODELS_DIR / "best_model.h5",
        MODELS_DIR / "vehicle_document_classifier.h5",
        MODELS_DIR / "final_model.h5",
        MODELS_DIR / "model_checkpoint.h5"
    ]
    
    for model_path in model_paths:
        if model_path.exists():
            try:
                model = tf.keras.models.load_model(model_path)
                print(f"✅ YOUR classification model loaded from: {model_path}")
                print(f"   Model name: {model.name}")
                print(f"   Parameters: {model.count_params():,}")
                return model
            except Exception as e:
                print(f"⚠️ Failed to load {model_path}: {e}")
                continue
    
    print("❌ YOUR trained model not found. Please ensure your model is saved.")
    print("💡 Expected locations:")
    for path in model_paths:
        print(f"   - {path}")
    return None

# Load YOUR classification model
your_classification_model = load_your_classification_model()

if your_classification_model is None:
    print("⚠️ Warning: YOUR classification model not loaded.")
    print("🔧 The OCR pipeline will work independently without classification.")
else:
    print("🎯 YOUR classification model ready for integration with OCR pipeline")

🔄 Loading YOUR trained classification model...
❌ YOUR trained model not found. Please ensure your model is saved.
💡 Expected locations:
   - ../models/best_model.h5
   - ../models/vehicle_document_classifier.h5
   - ../models/final_model.h5
   - ../models/model_checkpoint.h5
🔧 The OCR pipeline will work independently without classification.


In [None]:
print("🔧 Initializing OCR engines...")

class OCREngine:
    """Multi-engine OCR system for robust text recognition"""
    
    def __init__(self):
        self.engines = {}
        self.initialize_engines()
    
    def initialize_engines(self):
        """Initialize available OCR engines"""
        
        # EasyOCR - Good for general text and multiple languages
        if EASYOCR_AVAILABLE:
            try:
                self.engines['easyocr'] = easyocr.Reader(['en'])
                print("✅ EasyOCR initialized")
            except Exception as e:
                print(f"❌ EasyOCR initialization failed: {e}")
        
        # PaddleOCR - Excellent for complex layouts
        if PADDLE_AVAILABLE:
            try:
                self.engines['paddle'] = paddleocr.PaddleOCR(use_angle_cls=True, lang='en', show_log=False)
                print("✅ PaddleOCR initialized")
            except Exception as e:
                print(f"❌ PaddleOCR initialization failed: {e}")
        
        # Tesseract - Fallback option
        try:
            # Test if tesseract is available
            pytesseract.get_tesseract_version()
            self.engines['tesseract'] = 'available'
            print("✅ Tesseract available")
        except:
            print("⚠️ Tesseract not available")
        
        print(f"🎯 OCR engines initialized: {list(self.engines.keys())}")
    
    def extract_text_easyocr(self, image_region):
        """Extract text using EasyOCR"""
        if 'easyocr' not in self.engines:
            return []
        
        try:
            results = self.engines['easyocr'].readtext(image_region)
            extracted_texts = []
            
            for (bbox, text, confidence) in results:
                if confidence >= OCR_CONFIDENCE_THRESHOLD:
                    extracted_texts.append({
                        'text': text.strip(),
                        'confidence': float(confidence),
                        'bbox': bbox,
                        'engine': 'easyocr'
                    })
            
            return extracted_texts
        except Exception as e:
            print(f"EasyOCR error: {e}")
            return []
    
    def extract_text_paddle(self, image_region):
        """Extract text using PaddleOCR"""
        if 'paddle' not in self.engines:
            return []
        
        try:
            results = self.engines['paddle'].ocr(image_region, cls=True)
            extracted_texts = []
            
            if results and results[0]:
                for line in results[0]:
                    if line and len(line) >= 2:
                        bbox, (text, confidence) = line
                        if confidence >= OCR_CONFIDENCE_THRESHOLD:
                            extracted_texts.append({
                                'text': text.strip(),
                                'confidence': float(confidence),
                                'bbox': bbox,
                                'engine': 'paddle'
                            })
            
            return extracted_texts
        except Exception as e:
            print(f"PaddleOCR error: {e}")
            return []
    
    def extract_text_tesseract(self, image_region):
        """Extract text using Tesseract"""
        if 'tesseract' not in self.engines:
            return []
        
        try:
            # Configure Tesseract for different document types
            config = '--oem 3 --psm 6'  # Assume uniform block of text
            
            text = pytesseract.image_to_string(image_region, config=config)
            
            if text.strip():
                return [{
                    'text': text.strip(),
                    'confidence': 0.8,  # Tesseract doesn't provide confidence easily
                    'bbox': None,
                    'engine': 'tesseract'
                }]
            
            return []
        except Exception as e:
            print(f"Tesseract error: {e}")
            return []
    
    def extract_text_multi_engine(self, image_region, engines=['easyocr', 'paddle']):
        """Extract text using multiple engines and combine results"""
        all_results = []
        
        for engine in engines:
            if engine == 'easyocr':
                results = self.extract_text_easyocr(image_region)
            elif engine == 'paddle':
                results = self.extract_text_paddle(image_region)
            elif engine == 'tesseract':
                results = self.extract_text_tesseract(image_region)
            else:
                continue
            
            all_results.extend(results)
        
        # Combine and deduplicate results
        return self.combine_ocr_results(all_results)
    
    def combine_ocr_results(self, results):
        """Combine results from multiple OCR engines"""
        if not results:
            return []
        
        # Group similar texts
        combined = {}
        
        for result in results:
            text = result['text'].upper().strip()
            if text not in combined:
                combined[text] = result
            else:
                # Keep result with higher confidence
                if result['confidence'] > combined[text]['confidence']:
                    combined[text] = result
        
        return list(combined.values())

# Initialize OCR system
ocr_engine = OCREngine()

In [None]:
print("🎯 Setting up detection system...")

class VehicleDocumentDetector:
    """Vehicle document element detection system"""
    
    def __init__(self):
        self.yolo_model = None
        self.detection_classes = ['license_plate', 'odometer', 'text_region', 'damage']
        self.initialize_detector()
    
    def initialize_detector(self):
        """Initialize YOLO detection model"""
        
        if YOLO_AVAILABLE:
            try:
                # Try to load pre-trained YOLO model
                # Note: You might need to train a custom model for vehicle documents
                self.yolo_model = YOLO('yolov8n.pt')  # Using nano version for speed
                print("✅ YOLO detector initialized")
                print(f"   Model: YOLOv8 nano")
                print(f"   Classes: {len(self.yolo_model.names)} default classes")
            except Exception as e:
                print(f"❌ YOLO initialization failed: {e}")
                self.yolo_model = None
        else:
            print("⚠️ YOLO not available, using fallback detection")
    
    def detect_with_yolo(self, image):
        """Detect objects using YOLO"""
        if self.yolo_model is None:
            return []
        
        try:
            results = self.yolo_model(image, conf=DETECTION_CONFIDENCE)
            detections = []
            
            for r in results:
                boxes = r.boxes
                if boxes is not None:
                    for box in boxes:
                        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                        confidence = box.conf[0].cpu().numpy()
                        class_id = int(box.cls[0].cpu().numpy())
                        class_name = self.yolo_model.names[class_id]
                        
                        detections.append({
                            'bbox': [int(x1), int(y1), int(x2), int(y2)],
                            'confidence': float(confidence),
                            'class': class_name,
                            'class_id': class_id
                        })
            
            return detections
        except Exception as e:
            print(f"YOLO detection error: {e}")
            return []
    
    def detect_fallback_regions(self, image):
        """Fallback detection using traditional CV methods"""
        
        detections = []
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) if len(image.shape) == 3 else image
        height, width = gray.shape
        
        # Simple region proposals based on image analysis
        
        # 1. Text regions using edge detection
        edges = cv2.Canny(gray, 50, 150)
        contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > 100:  # Filter small regions
                x, y, w, h = cv2.boundingRect(contour)
                
                # Classify region based on aspect ratio and position
                aspect_ratio = w / h
                relative_y = y / height
                
                if 0.2 < aspect_ratio < 8 and w > 50 and h > 20:  # Likely text
                    region_type = 'text_region'
                    
                    # Specific heuristics for license plates (rectangular, specific ratio)
                    if 2 < aspect_ratio < 6 and 0.3 < relative_y < 0.8:
                        region_type = 'license_plate'
                    
                    # Odometer regions (usually in upper portion, squarish)
                    elif 0.5 < aspect_ratio < 2 and relative_y < 0.5:
                        region_type = 'odometer'
                    
                    detections.append({
                        'bbox': [x, y, x+w, y+h],
                        'confidence': 0.7,  # Default confidence for fallback
                        'class': region_type,
                        'class_id': 0
                    })
        
        return detections
    
    def detect_elements(self, image):
        """Main detection method"""
        
        # Try YOLO first, fallback to traditional methods
        detections = self.detect_with_yolo(image)
        
        if not detections:
            detections = self.detect_fallback_regions(image)
        
        # Filter and clean detections
        filtered_detections = []
        for detection in detections:
            bbox = detection['bbox']
            
            # Basic validation
            if (bbox[2] > bbox[0] and bbox[3] > bbox[1] and 
                bbox[2] - bbox[0] > 20 and bbox[3] - bbox[1] > 20):
                
                filtered_detections.append(detection)
        
        return filtered_detections

# Initialize detector
detector = VehicleDocumentDetector()


In [None]:
class TextValidator:
    """Text validation and post-processing"""
    
    def __init__(self):
        self.patterns = TEXT_PATTERNS
    
    def validate_license_plate(self, text):
        """Validate license plate format"""
        text = text.upper().strip().replace(' ', '').replace('-', '')
        
        # Check length
        if not (self.patterns['license_plate']['min_length'] <= 
                len(text) <= self.patterns['license_plate']['max_length']):
            return False, 0.0
        
        # Check patterns
        for pattern in self.patterns['license_plate']['patterns']:
            modified_pattern = pattern.replace('-', '')
            if re.match(modified_pattern, text):
                return True, 0.9
        
        # Partial match scoring
        alphanumeric_ratio = sum(c.isalnum() for c in text) / len(text)
        if alphanumeric_ratio > 0.8:
            return True, 0.6
        
        return False, 0.0
    
    def validate_odometer(self, text):
        """Validate odometer reading"""
        # Clean text
        cleaned = re.sub(r'[^\d\.,]', '', text.strip())
        
        if not cleaned:
            return False, 0.0
        
        # Try to extract number
        try:
            # Handle different formats
            if ',' in cleaned:
                value = float(cleaned.replace(',', ''))
            elif '.' in cleaned and cleaned.count('.') == 1:
                value = float(cleaned)
            else:
                value = float(cleaned)
            
            # Validate range
            if (self.patterns['odometer']['min_value'] <= 
                value <= self.patterns['odometer']['max_value']):
                return True, 0.9
            else:
                return False, 0.3
                
        except ValueError:
            return False, 0.0
    
    def validate_document_text(self, text):
        """Validate general document text"""
        text = text.strip()
        
        # Check length
        if not (self.patterns['document_text']['min_length'] <= 
                len(text) <= self.patterns['document_text']['max_length']):
            return False, 0.0
        
        # Check allowed characters
        allowed_chars = self.patterns['document_text']['allowed_chars']
        if re.match(f'^{allowed_chars}+$', text):
            return True, 0.8
        
        # Partial validation
        valid_char_ratio = len(re.findall(allowed_chars, text)) / len(text)
        if valid_char_ratio > 0.7:
            return True, 0.6
        
        return False, 0.0
    
    def format_license_plate(self, text):
        """Format license plate text"""
        text = text.upper().strip().replace(' ', '')
        
        # Common formatting patterns
        if len(text) == 6 and text[:3].isalpha() and text[3:].isdigit():
            return f"{text[:3]}-{text[3:]}"
        elif len(text) == 6 and text[:2].isalpha() and text[2:].isdigit():
            return f"{text[:2]}{text[2:]}"
        
        return text
    
    def format_odometer(self, text):
        """Format odometer reading"""
        cleaned = re.sub(r'[^\d]', '', text.strip())
        
        if len(cleaned) > 3:
            # Add comma separator for readability
            return f"{int(cleaned):,}"
        
        return cleaned
    
    def post_process_text(self, text, text_type):
        """Post-process extracted text based on type"""
        
        if text_type == 'license_plate':
            is_valid, confidence = self.validate_license_plate(text)
            formatted_text = self.format_license_plate(text) if is_valid else text
            
        elif text_type == 'odometer':
            is_valid, confidence = self.validate_odometer(text)
            formatted_text = self.format_odometer(text) if is_valid else text
            
        elif text_type == 'document_text':
            is_valid, confidence = self.validate_document_text(text)
            formatted_text = text.strip()
            
        else:
            is_valid, confidence = True, 0.5
            formatted_text = text.strip()
        
        return {
            'original_text': text,
            'formatted_text': formatted_text,
            'is_valid': is_valid,
            'validation_confidence': confidence
        }

# Initialize text validator
text_validator = TextValidator()

print("✅ Text validation system initialized")


In [None]:

class VehicleDocumentExtractor:
    """Complete vehicle document information extraction system using YOUR trained model"""
    
    def __init__(self, your_model, detector, ocr_engine, text_validator):
        self.your_model = your_model
        self.detector = detector
        self.ocr_engine = ocr_engine
        self.text_validator = text_validator
        
    def classify_document(self, image):
        """Classify document type using YOUR trained model"""
        if self.your_model is None:
            return {'prediction': 'unknown', 'confidence': 0.0}
        
        try:
            # Preprocess image for YOUR model
            if len(image.shape) == 3:
                processed_image = cv2.resize(image, (IMG_SIZE, IMG_SIZE))
                processed_image = processed_image.astype(np.float32) / 255.0
                processed_image = np.expand_dims(processed_image, axis=0)
            
            # Get prediction from YOUR model
            prediction = self.your_model.predict(processed_image, verbose=0)
            class_id = np.argmax(prediction[0])
            confidence = float(prediction[0][class_id])
            
            return {
                'prediction': CLASS_NAMES[class_id],
                'confidence': confidence,
                'probabilities': {CLASS_NAMES[i]: float(prediction[0][i]) 
                                for i in range(len(CLASS_NAMES))}
            }
        except Exception as e:
            print(f"Classification error with YOUR model: {e}")
            return {'prediction': 'unknown', 'confidence': 0.0}
    
    def extract_information(self, image_path):
        """Complete information extraction pipeline"""
        
        # Load image
        if isinstance(image_path, str) or isinstance(image_path, Path):
            image = cv2.imread(str(image_path))
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        else:
            image = image_path
        
        results = {
            'image_info': {
                'shape': image.shape,
                'path': str(image_path) if isinstance(image_path, (str, Path)) else 'array'
            },
            'classification': {},
            'detections': [],
            'extracted_text': [],
            'processed_information': {},
            'overall_confidence': 0.0
        }
        
        try:
            # Step 1: Classify document type
            print("🔍 Step 1: Document classification...")
            classification_result = self.classify_document(image)
            results['classification'] = classification_result
            
            # Step 2: Detect elements in image
            print("🎯 Step 2: Element detection...")
            detections = self.detector.detect_elements(image)
            results['detections'] = detections
            
            # Step 3: Extract text from detected regions
            print("📝 Step 3: Text extraction...")
            extracted_texts = []
            
            for i, detection in enumerate(detections):
                bbox = detection['bbox']
                x1, y1, x2, y2 = bbox
                
                # Extract region
                region = image[y1:y2, x1:x2]
                
                if region.size > 0:
                    # OCR on region
                    ocr_results = self.ocr_engine.extract_text_multi_engine(region)
                    
                    for ocr_result in ocr_results:
                        # Determine text type based on detection class
                        detected_class = detection['class']
                        if 'plate' in detected_class.lower():
                            text_type = 'license_plate'
                        elif 'odometer' in detected_class.lower():
                            text_type = 'odometer'
                        else:
                            text_type = 'document_text'
                        
                        # Post-process text
                        processed = self.text_validator.post_process_text(
                            ocr_result['text'], text_type
                        )
                        
                        extracted_text_info = {
                            'detection_id': i,
                            'region_bbox': bbox,
                            'region_class': detected_class,
                            'text_type': text_type,
                            'raw_text': ocr_result['text'],
                            'processed_text': processed,
                            'ocr_confidence': ocr_result['confidence'],
                            'ocr_engine': ocr_result['engine'],
                            'text_bbox': ocr_result.get('bbox', None)
                        }
                        
                        extracted_texts.append(extracted_text_info)
            
            results['extracted_text'] = extracted_texts
            
            # Step 4: Structure and validate information
            print("🧹 Step 4: Information structuring...")
            structured_info = self.structure_information(extracted_texts, classification_result)
            results['processed_information'] = structured_info
            
            # Step 5: Calculate overall confidence
            results['overall_confidence'] = self.calculate_overall_confidence(results)
            
            print(f"✅ Extraction complete! Found {len(extracted_texts)} text elements")
            
        except Exception as e:
            print(f"❌ Extraction pipeline error: {e}")
            results['error'] = str(e)
        
        return results
    
    def structure_information(self, extracted_texts, classification_result):
        """Structure extracted information by type"""
        
        structured = {
            'license_plates': [],
            'odometer_readings': [],
            'document_texts': [],
            'summary': {
                'document_type': classification_result.get('prediction', 'unknown'),
                'classification_confidence': classification_result.get('confidence', 0.0),
                'total_extractions': len(extracted_texts),
                'high_confidence_extractions': 0
            }
        }
        
        for text_info in extracted_texts:
            processed = text_info['processed_text']
            
            # Create structured entry
            entry = {
                'text': processed['formatted_text'],
                'raw_text': text_info['raw_text'],
                'confidence': text_info['ocr_confidence'],
                'validation_confidence': processed['validation_confidence'],
                'combined_confidence': (text_info['ocr_confidence'] + processed['validation_confidence']) / 2,
                'is_valid': processed['is_valid'],
                'region_bbox': text_info['region_bbox'],
                'ocr_engine': text_info['ocr_engine']
            }
            
            # Categorize by type
            if text_info['text_type'] == 'license_plate':
                structured['license_plates'].append(entry)
            elif text_info['text_type'] == 'odometer':
                structured['odometer_readings'].append(entry)
            else:
                structured['document_texts'].append(entry)
            
            # Count high confidence extractions
            if entry['combined_confidence'] > 0.7:
                structured['summary']['high_confidence_extractions'] += 1
        
        return structured
    
    def calculate_overall_confidence(self, results):
        """Calculate overall extraction confidence"""
        
        confidences = []
        
        # Classification confidence
        if 'classification' in results and 'confidence' in results['classification']:
            confidences.append(results['classification']['confidence'])
        
        # Text extraction confidences
        for text_info in results.get('extracted_text', []):
            if text_info['processed_text']['is_valid']:
                combined_conf = (text_info['ocr_confidence'] + 
                               text_info['processed_text']['validation_confidence']) / 2
                confidences.append(combined_conf)
        
        return np.mean(confidences) if confidences else 0.0

# Initialize complete extraction system using YOUR model
if your_classification_model and detector and ocr_engine and text_validator:
    extractor = VehicleDocumentExtractor(
        your_model=your_classification_model,
        detector=detector,
        ocr_engine=ocr_engine,
        text_validator=text_validator
    )
    print("🚀 Complete extraction system initialized with YOUR trained model!")
else:
    print("⚠️ Some components missing:")
    print(f"   YOUR model: {'✅' if your_classification_model else '❌'}")
    print(f"   Detector: {'✅' if detector else '❌'}")
    print(f"   OCR engine: {'✅' if ocr_engine else '❌'}")
    print(f"   Text validator: {'✅' if text_validator else '❌'}")

print("\n✅ Information extraction pipeline ready!")
print("🎯 Ready to process vehicle documents using YOUR model + YOLO + OCR")

In [None]:

print("\n" + "="*60)
print("🧪 TESTING INFORMATION EXTRACTION PIPELINE")
print("="*60)

def load_test_annotations():
    """Load test annotations for evaluation"""
    try:
        with open(ANNOTATIONS_DIR / "test_balanced_final.json", 'r') as f:
            test_data = json.load(f)
        print(f"📊 Loaded {len(test_data)} test samples")
        return test_data
    except Exception as e:
        print(f"⚠️ Could not load test annotations: {e}")
        return []

# Load test data
test_annotations = load_test_annotations()

# Process sample images
if test_annotations and len(test_annotations) > 0:
    print("\n🔄 Processing sample images...")
    
    # Select diverse samples for testing
    sample_indices = [0, len(test_annotations)//4, len(test_annotations)//2, -1]
    sample_results = []
    
    for i, idx in enumerate(sample_indices[:3]):  # Process first 3 samples
        if idx < len(test_annotations):
            annotation = test_annotations[idx]
            
            # Get image path with fallback strategy
            image_path = None
            for path_key in ['enhanced_path', 'roi_path', 'original_path']:
                if path_key in annotation:
                    potential_path = annotation[path_key]
                    
                    if Path(potential_path).exists():
                        image_path = potential_path
                        break
                    elif Path(PROJECT_ROOT / potential_path).exists():
                        image_path = PROJECT_ROOT / potential_path
                        break
            
            if image_path and Path(image_path).exists():
                print(f"\n📸 Processing sample {i+1}: {Path(image_path).name}")
                
                try:
                    # Extract information
                    if 'extractor' in locals():
                        result = extractor.extract_information(image_path)
                        result['sample_info'] = {
                            'index': idx,
                            'filename': Path(image_path).name,
                            'annotation': annotation
                        }
                        sample_results.append(result)
                        
                        # Print summary
                        print(f"   Document type: {result['classification'].get('prediction', 'unknown')} "
                              f"({result['classification'].get('confidence', 0):.3f})")
                        print(f"   Detections: {len(result['detections'])}")
                        print(f"   Text extractions: {len(result['extracted_text'])}")
                        print(f"   Overall confidence: {result['overall_confidence']:.3f}")
                        
                        # Show extracted texts
                        structured = result['processed_information']
                        if structured['license_plates']:
                            for plate in structured['license_plates']:
                                print(f"   📋 License plate: '{plate['text']}' (conf: {plate['combined_confidence']:.3f})")
                        
                        if structured['odometer_readings']:
                            for odo in structured['odometer_readings']:
                                print(f"   🔢 Odometer: '{odo['text']}' (conf: {odo['combined_confidence']:.3f})")
                        
                        if structured['document_texts']:
                            for doc in structured['document_texts'][:2]:  # Show first 2
                                print(f"   📄 Document text: '{doc['text'][:50]}...' (conf: {doc['combined_confidence']:.3f})")
                    
                    else:
                        print("   ⚠️ Extractor not available")
                        
                except Exception as e:
                    print(f"   ❌ Error processing sample: {e}")
            else:
                print(f"   ⚠️ Sample {i+1}: Image not found")
    
    print(f"\n✅ Processed {len(sample_results)} samples successfully")

else:
    print("⚠️ No test data available for processing")

In [None]:
print("\n📊 Creating extraction result visualizations...")

def visualize_extraction_result(result, save_path=None):
    """Visualize extraction results with bounding boxes and text"""
    
    if 'sample_info' not in result:
        print("⚠️ No image info available for visualization")
        return None
    
    # Load original image
    image_path = None
    annotation = result['sample_info']['annotation']
    
    for path_key in ['enhanced_path', 'roi_path', 'original_path']:
        if path_key in annotation:
            potential_path = annotation[path_key]
            if Path(potential_path).exists():
                image_path = potential_path
                break
            elif Path(PROJECT_ROOT / potential_path).exists():
                image_path = PROJECT_ROOT / potential_path
                break
    
    if not image_path:
        print("⚠️ Cannot load image for visualization")
        return None
    
    try:
        # Load and prepare image
        image = cv2.imread(str(image_path))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Create figure
        fig, axes = plt.subplots(2, 2, figsize=(16, 12))
        fig.suptitle(f'🔍 Extraction Results: {Path(image_path).name}', fontsize=14, fontweight='bold')
        
        # 1. Original image
        axes[0, 0].imshow(image)
        axes[0, 0].set_title('Original Image')
        axes[0, 0].axis('off')
        
        # 2. Detection results
        image_with_detections = image.copy()
        
        # Draw detection bounding boxes
        for i, detection in enumerate(result['detections']):
            bbox = detection['bbox']
            x1, y1, x2, y2 = bbox
            
            # Color by detection type
            color_map = {
                'license_plate': (255, 0, 0),    # Red
                'odometer': (0, 255, 0),         # Green
                'text_region': (0, 0, 255),      # Blue
                'damage': (255, 255, 0)          # Yellow
            }
            
            color = color_map.get(detection['class'], (128, 128, 128))
            
            cv2.rectangle(image_with_detections, (x1, y1), (x2, y2), color, 2)
            
            # Add label
            label = f"{detection['class']} ({detection['confidence']:.2f})"
            cv2.putText(image_with_detections, label, (x1, y1-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
        
        axes[0, 1].imshow(image_with_detections)
        axes[0, 1].set_title(f'Detections ({len(result["detections"])})')
        axes[0, 1].axis('off')
        
        # 3. Extracted text regions
        if result['extracted_text']:
            # Show first few text regions
            text_mosaic = []
            for i, text_info in enumerate(result['extracted_text'][:6]):  # Max 6 regions
                bbox = text_info['region_bbox']
                x1, y1, x2, y2 = bbox
                region = image[y1:y2, x1:x2]
                
                if region.size > 0:
                    # Resize region for display
                    region_resized = cv2.resize(region, (100, 50))
                    text_mosaic.append(region_resized)
            
            if text_mosaic:
                # Arrange in grid
                if len(text_mosaic) >= 4:
                    top_row = np.hstack(text_mosaic[:2])
                    bottom_row = np.hstack(text_mosaic[2:4])
                    mosaic = np.vstack([top_row, bottom_row])
                elif len(text_mosaic) >= 2:
                    mosaic = np.hstack(text_mosaic[:2])
                else:
                    mosaic = text_mosaic[0]
                
                axes[1, 0].imshow(mosaic)
                axes[1, 0].set_title('Extracted Text Regions')
                axes[1, 0].axis('off')
            else:
                axes[1, 0].text(0.5, 0.5, 'No text regions\nextracted', 
                               ha='center', va='center', transform=axes[1, 0].transAxes)
                axes[1, 0].axis('off')
        else:
            axes[1, 0].text(0.5, 0.5, 'No text regions\nextracted', 
                           ha='center', va='center', transform=axes[1, 0].transAxes)
            axes[1, 0].axis('off')
        
        # 4. Results summary
        axes[1, 1].axis('off')
        
        # Create text summary
        summary_text = []
        summary_text.append(f"📋 EXTRACTION SUMMARY")
        summary_text.append(f"")
        
        # Classification
        classification = result['classification']
        summary_text.append(f"Document Type: {classification.get('prediction', 'Unknown')}")
        summary_text.append(f"Classification Confidence: {classification.get('confidence', 0):.3f}")
        summary_text.append(f"")
        
        # Extracted information
        structured = result['processed_information']
        
        if structured['license_plates']:
            summary_text.append(f"🚗 License Plates:")
            for plate in structured['license_plates']:
                summary_text.append(f"  • {plate['text']} (conf: {plate['combined_confidence']:.3f})")
        
        if structured['odometer_readings']:
            summary_text.append(f"🔢 Odometer Readings:")
            for odo in structured['odometer_readings']:
                summary_text.append(f"  • {odo['text']} (conf: {odo['combined_confidence']:.3f})")
        
        if structured['document_texts']:
            summary_text.append(f"📄 Document Texts:")
            for doc in structured['document_texts'][:3]:  # Show first 3
                text_preview = doc['text'][:30] + "..." if len(doc['text']) > 30 else doc['text']
                summary_text.append(f"  • {text_preview} (conf: {doc['combined_confidence']:.3f})")
        
        summary_text.append(f"")
        summary_text.append(f"Overall Confidence: {result['overall_confidence']:.3f}")
        
        # Display summary text
        summary_str = '\n'.join(summary_text)
        axes[1, 1].text(0.05, 0.95, summary_str, transform=axes[1, 1].transAxes,
                        fontsize=10, verticalalignment='top', fontfamily='monospace')
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"📸 Visualization saved to: {save_path}")
        
        plt.show()
        
        return fig
        
    except Exception as e:
        print(f"❌ Visualization error: {e}")
        return None

# Visualize results for processed samples
if sample_results:
    print("\n🎨 Creating visualizations for processed samples...")
    
    for i, result in enumerate(sample_results[:2]):  # Visualize first 2 samples
        save_path = EXTRACTION_DIR / f"extraction_result_{i+1}.png"
        print(f"\n📊 Visualizing sample {i+1}...")
        visualize_extraction_result(result, save_path)


In [None]:
print("\n" + "="*60)
print("📊 EXTRACTION PERFORMANCE EVALUATION")
print("="*60)

def evaluate_extraction_performance(sample_results):
    """Evaluate extraction pipeline performance"""
    
    if not sample_results:
        print("⚠️ No results to evaluate")
        return {}
    
    metrics = {
        'total_samples': len(sample_results),
        'successful_classifications': 0,
        'successful_detections': 0,
        'successful_extractions': 0,
        'high_confidence_extractions': 0,
        'average_confidence': 0.0,
        'extraction_breakdown': {
            'license_plates': 0,
            'odometer_readings': 0,
            'document_texts': 0
        },
        'confidence_distribution': [],
        'processing_success_rate': 0.0
    }
    
    total_confidence = 0.0
    
    for result in sample_results:
        # Check if processing was successful
        if 'error' not in result:
            metrics['successful_detections'] += 1
            
            # Classification success
            if result['classification'].get('confidence', 0) > 0.5:
                metrics['successful_classifications'] += 1
            
            # Extraction success
            if result['extracted_text']:
                metrics['successful_extractions'] += 1
            
            # Overall confidence
            overall_conf = result['overall_confidence']
            total_confidence += overall_conf
            metrics['confidence_distribution'].append(overall_conf)
            
            if overall_conf > 0.7:
                metrics['high_confidence_extractions'] += 1
            
            # Breakdown by type
            structured = result['processed_information']
            metrics['extraction_breakdown']['license_plates'] += len(structured['license_plates'])
            metrics['extraction_breakdown']['odometer_readings'] += len(structured['odometer_readings'])
            metrics['extraction_breakdown']['document_texts'] += len(structured['document_texts'])
    
    # Calculate averages
    if metrics['total_samples'] > 0:
        metrics['average_confidence'] = total_confidence / metrics['total_samples']
        metrics['processing_success_rate'] = metrics['successful_detections'] / metrics['total_samples']
    
    return metrics

# Evaluate performance
if sample_results:
    performance_metrics = evaluate_extraction_performance(sample_results)
    
    print("📈 EXTRACTION PIPELINE PERFORMANCE:")
    print("-" * 50)
    print(f"Total samples processed: {performance_metrics['total_samples']}")
    print(f"Processing success rate: {performance_metrics['processing_success_rate']:.1%}")
    print(f"Classification success: {performance_metrics['successful_classifications']}/{performance_metrics['total_samples']}")
    print(f"Detection success: {performance_metrics['successful_detections']}/{performance_metrics['total_samples']}")
    print(f"Extraction success: {performance_metrics['successful_extractions']}/{performance_metrics['total_samples']}")
    print(f"High confidence extractions: {performance_metrics['high_confidence_extractions']}/{performance_metrics['total_samples']}")
    print(f"Average confidence: {performance_metrics['average_confidence']:.3f}")
    
    print(f"\n📊 EXTRACTION BREAKDOWN:")
    breakdown = performance_metrics['extraction_breakdown']
    print(f"License plates found: {breakdown['license_plates']}")
    print(f"Odometer readings found: {breakdown['odometer_readings']}")
    print(f"Document texts found: {breakdown['document_texts']}")
    print(f"Total extractions: {sum(breakdown.values())}")
    
    # Create performance visualization
    if len(sample_results) > 1:
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle('📊 Information Extraction Performance Analysis', fontsize=14, fontweight='bold')
        
        # 1. Success rates
        ax1 = axes[0, 0]
        categories = ['Processing', 'Classification', 'Detection', 'Extraction', 'High Confidence']
        success_rates = [
            performance_metrics['processing_success_rate'],
            performance_metrics['successful_classifications'] / performance_metrics['total_samples'],
            performance_metrics['successful_detections'] / performance_metrics['total_samples'],
            performance_metrics['successful_extractions'] / performance_metrics['total_samples'],
            performance_metrics['high_confidence_extractions'] / performance_metrics['total_samples']
        ]
        
        bars = ax1.bar(categories, [rate * 100 for rate in success_rates], 
                      color=['skyblue', 'lightgreen', 'orange', 'gold', 'lightcoral'])
        ax1.set_title('Success Rates by Category')
        ax1.set_ylabel('Success Rate (%)')
        ax1.set_ylim(0, 100)
        
        # Add value labels
        for bar, rate in zip(bars, success_rates):
            height = bar.get_height()
            ax1.text(bar.get_x() + bar.get_width()/2., height + 1,
                    f'{rate:.1%}', ha='center', va='bottom')
        
        plt.setp(ax1.get_xticklabels(), rotation=45, ha='right')
        
        # 2. Extraction type breakdown
        ax2 = axes[0, 1]
        extraction_types = list(breakdown.keys())
        extraction_counts = list(breakdown.values())
        
        if sum(extraction_counts) > 0:
            ax2.pie(extraction_counts, labels=extraction_types, autopct='%1.1f%%',
                   colors=['lightblue', 'lightgreen', 'lightyellow'])
            ax2.set_title('Extraction Types Distribution')
        else:
            ax2.text(0.5, 0.5, 'No extractions\nto display', ha='center', va='center')
            ax2.set_title('Extraction Types Distribution')
        
        # 3. Confidence distribution
        ax3 = axes[1, 0]
        if performance_metrics['confidence_distribution']:
            ax3.hist(performance_metrics['confidence_distribution'], bins=10, 
                    color='lightgreen', alpha=0.7, edgecolor='black')
            ax3.set_title('Confidence Score Distribution')
            ax3.set_xlabel('Confidence Score')
            ax3.set_ylabel('Frequency')
            ax3.axvline(performance_metrics['average_confidence'], color='red', 
                       linestyle='--', label=f'Average: {performance_metrics["average_confidence"]:.3f}')
            ax3.legend()
        else:
            ax3.text(0.5, 0.5, 'No confidence\ndata available', ha='center', va='center')
            ax3.set_title('Confidence Score Distribution')
        
        # 4. Sample results summary
        ax4 = axes[1, 1]
        ax4.axis('off')
        
        # Create summary text
        summary_lines = [
            "📋 EXTRACTION SUMMARY",
            "",
            f"✅ Processed: {performance_metrics['total_samples']} samples",
            f"🎯 Success Rate: {performance_metrics['processing_success_rate']:.1%}",
            f"📊 Avg Confidence: {performance_metrics['average_confidence']:.3f}",
            "",
            f"🚗 License Plates: {breakdown['license_plates']}",
            f"🔢 Odometer Readings: {breakdown['odometer_readings']}",
            f"📄 Document Texts: {breakdown['document_texts']}",
            "",
            f"🏆 High Confidence: {performance_metrics['high_confidence_extractions']}/{performance_metrics['total_samples']}",
            f"⚡ Total Extractions: {sum(breakdown.values())}"
        ]
        
        summary_text = '\n'.join(summary_lines)
        ax4.text(0.05, 0.95, summary_text, transform=ax4.transAxes,
                fontsize=11, verticalalignment='top', fontfamily='monospace',
                bbox=dict(boxstyle="round,pad=0.5", facecolor="lightgray", alpha=0.8))
        
        plt.tight_layout()
        
        # Save performance visualization
        perf_viz_path = EXTRACTION_DIR / "extraction_performance_analysis.png"
        plt.savefig(perf_viz_path, dpi=300, bbox_inches='tight')
        plt.show()
        
        print(f"📊 Performance analysis saved to: {perf_viz_path}")

else:
    print("⚠️ No results available for performance evaluation")

In [None]:

print("\n💾 Saving extraction results and configuration...")

# Save sample results
if sample_results:
    results_data = {
        "experiment_metadata": {
            "experiment_type": "information_extraction",
            "timestamp": pd.Timestamp.now().isoformat(),
            "total_samples": len(sample_results),
            "ocr_engines": list(ocr_engine.engines.keys()) if 'ocr_engine' in locals() else [],
            "detection_system": "YOLO + CV fallback",
            "validation_patterns": TEXT_PATTERNS
        },
        "performance_metrics": performance_metrics if 'performance_metrics' in locals() else {},
        "sample_results": sample_results,
        "system_configuration": {
            "your_classification_model_available": your_classification_model is not None,
            "detection_confidence_threshold": DETECTION_CONFIDENCE,
            "ocr_confidence_threshold": OCR_CONFIDENCE_THRESHOLD,
            "image_size": IMG_SIZE
        }
    }
    
    # Save comprehensive results
    results_path = EXTRACTION_DIR / "extraction_results.json"
    with open(results_path, 'w') as f:
        # Convert numpy types to native Python types for JSON serialization
        json_data = json.loads(json.dumps(results_data, default=str))
        json.dump(json_data, f, indent=2)
    
    print(f"✅ Extraction results saved to: {results_path}")

# Save system configuration for production
production_config = {
    "pipeline_components": {
        "your_classification_model": "integrated" if your_classification_model else "not_available",
        "detection_system": "YOLO + CV fallback",
        "ocr_engines": list(ocr_engine.engines.keys()) if 'ocr_engine' in locals() else [],
        "text_validation": "pattern_based"
    },
    "processing_parameters": {
        "detection_confidence": DETECTION_CONFIDENCE,
        "ocr_confidence": OCR_CONFIDENCE_THRESHOLD,
        "image_size": IMG_SIZE
    },
    "output_structure": {
        "classification": "document_type + confidence",
        "detections": "bbox + class + confidence",
        "extractions": "text + validation + confidence",
        "structured_info": "organized_by_type"
    },
    "validation_patterns": TEXT_PATTERNS
}

config_path = EXTRACTION_DIR / "production_config.json"
with open(config_path, 'w') as f:
    json.dump(production_config, f, indent=2)

print(f"✅ Production configuration saved to: {config_path}")

print(f"\n📁 All extraction artifacts saved to: {EXTRACTION_DIR}")