In [1]:
import cv2
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models import resnet50, efficientnet_b0
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
import pickle
import json
import os
from typing import Dict, List, Tuple, Optional
import matplotlib.pyplot as plt
from PIL import Image
import seaborn as sns
from collections import Counter
import warnings
warnings.filterwarnings('ignore')

In [2]:
class CarDamageDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None, damage_areas=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform
        self.damage_areas = damage_areas if damage_areas is not None else [0] * len(image_paths)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Load image
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]
        damage_area = self.damage_areas[idx]

        return image, label, damage_area


In [10]:
class CarDamageCNN(nn.Module):
    def __init__(self, num_classes=8, pretrained=True):
        super(CarDamageCNN, self).__init__()

        # Use EfficientNet as backbone (good for small datasets)
        self.backbone = efficientnet_b0(pretrained=pretrained)

        # Get the number of features from the backbone
        # Access the in_features from the last layer before the classifier
        # The classifier is a Sequential with the last layer being Linear
        num_features = self.backbone.classifier[-1].in_features


        # Replace classifier with our custom layers
        self.backbone.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(num_features, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_classes)
        )

        # Additional regression head for damage area estimation
        self.area_regressor = nn.Sequential(
            nn.Linear(num_features, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        # Extract features
        features = self.backbone.features(x)
        features = self.backbone.avgpool(features)
        features = torch.flatten(features, 1)

        # Classification output
        classification = self.backbone.classifier(features)

        # Regression output for damage area
        area_pred = self.area_regressor(features)

        return classification, area_pred

In [4]:
class CarDamageDetector:
    def __init__(self, model_path: Optional[str] = None):
        self.damage_classes = [
            'no_damage', 'dent', 'scratch', 'crack',
            'broken_part', 'rust', 'paint_damage', 'severe_damage'
        ]

        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"Using device: {self.device}")

        # Initialize model
        self.model = CarDamageCNN(num_classes=len(self.damage_classes))
        self.model.to(self.device)

        # Label encoder for damage classes
        self.label_encoder = LabelEncoder()
        self.label_encoder.fit(self.damage_classes)

        # Price prediction model
        self.price_model = None
        self.scaler = StandardScaler()

        # Data transformations
        self.train_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomRotation(10),
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
            transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                               std=[0.229, 0.224, 0.225])
        ])

        self.val_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                               std=[0.229, 0.224, 0.225])
        ])

        # Price estimation based on damage type and severity
        self.base_prices = {
            'no_damage': 0,
            'dent': 300,
            'scratch': 200,
            'crack': 400,
            'broken_part': 800,
            'rust': 150,
            'paint_damage': 350,
            'severe_damage': 1200
        }

        # Load model if path provided
        if model_path and os.path.exists(model_path):
            self.load_model(model_path)

In [5]:
def prepare_dataset(self, data_dir: str, create_labels: bool = False) -> Tuple[List, List, List]:
        """
        Prepare dataset from directory structure or CSV file
        """
        image_paths = []
        labels = []
        damage_areas = []

        if create_labels:
            # Interactive labeling for small dataset
            print("Interactive labeling mode...")
            self._interactive_labeling(data_dir)

        # Load from CSV if exists
        csv_path = os.path.join(data_dir, 'labels.csv')
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path)
            image_paths = [os.path.join(data_dir, img) for img in df['image_path']]
            labels = df['damage_type'].tolist()
            damage_areas = df.get('damage_area', [0] * len(image_paths)).tolist()
        else:
            # Assume directory structure: data_dir/damage_type/image.jpg
            for damage_type in os.listdir(data_dir):
                damage_dir = os.path.join(data_dir, damage_type)
                if os.path.isdir(damage_dir):
                    for img_file in os.listdir(damage_dir):
                        if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
                            image_paths.append(os.path.join(damage_dir, img_file))
                            labels.append(damage_type)
                            damage_areas.append(0)  # Default area

        print(f"Found {len(image_paths)} images")
        print(f"Label distribution: {Counter(labels)}")

        return image_paths, labels, damage_areas

In [6]:
def _interactive_labeling(self, data_dir: str):
        labels_data = []

        for img_file in os.listdir(data_dir):
            if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
                img_path = os.path.join(data_dir, img_file)

                # Display image
                img = cv2.imread(img_path)
                img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

                plt.figure(figsize=(10, 8))
                plt.imshow(img_rgb)
                plt.title(f"Label this image: {img_file}")
                plt.axis('off')
                plt.show()

                # Get label
                print("\nDamage types:")
                for i, damage_type in enumerate(self.damage_classes):
                    print(f"{i}: {damage_type}")

                while True:
                    try:
                        label_idx = int(input("Enter damage type index: "))
                        if 0 <= label_idx < len(self.damage_classes):
                            damage_type = self.damage_classes[label_idx]
                            break
                        else:
                            print("Invalid index. Try again.")
                    except ValueError:
                        print("Please enter a valid number.")

                # Get damage area estimate (optional)
                try:
                    area = float(input("Estimated damage area (0-1, where 1 is entire image): ") or "0")
                except ValueError:
                    area = 0

                labels_data.append({
                    'image_path': img_file,
                    'damage_type': damage_type,
                    'damage_area': area
                })

                plt.close()

        # Save labels
        df = pd.DataFrame(labels_data)
        df.to_csv(os.path.join(data_dir, 'labels.csv'), index=False)
        print(f"Labels saved to {os.path.join(data_dir, 'labels.csv')}")

In [18]:
class CarDamageDetector:
    def __init__(self, model_path: Optional[str] = None):
        self.damage_classes = [
            'no_damage', 'dent', 'scratch', 'crack',
            'broken_part', 'rust', 'paint_damage', 'severe_damage'
        ]

        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"Using device: {self.device}")

        # Initialize model
        self.model = CarDamageCNN(num_classes=len(self.damage_classes))
        self.model.to(self.device)

        # Label encoder for damage classes
        self.label_encoder = LabelEncoder()
        self.label_encoder.fit(self.damage_classes)

        # Price prediction model
        self.price_model = None
        self.scaler = StandardScaler()

        # Data transformations
        self.train_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomRotation(10),
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
            transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                               std=[0.229, 0.224, 0.225])
        ])

        self.val_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                               std=[0.229, 0.224, 0.225])
        ])

        # Price estimation based on damage type and severity
        self.base_prices = {
            'no_damage': 0,
            'dent': 300,
            'scratch': 200,
            'crack': 400,
            'broken_part': 800,
            'rust': 150,
            'paint_damage': 350,
            'severe_damage': 1200
        }

        # Load model if path provided
        if model_path and os.path.exists(model_path):
            self.load_model(model_path)

    def prepare_dataset(self, data_dir: str, create_labels: bool = False) -> Tuple[List, List, List]:
        """
        Prepare dataset from directory structure or CSV file
        """
        image_paths = []
        labels = []
        damage_areas = []

        csv_path = os.path.join(data_dir, 'labels.csv')

        if os.path.exists(csv_path):
            # Load from CSV if exists
            try:
                df = pd.read_csv(csv_path)
                if not df.empty:
                    image_paths = [os.path.join(data_dir, img) for img in df['image_path']]
                    labels = df['damage_type'].tolist()
                    damage_areas = df.get('damage_area', [0] * len(image_paths)).tolist()
                else:
                    print(f"Warning: {csv_path} is empty.")
            except pd.errors.EmptyDataError:
                print(f"Warning: {csv_path} is empty or has no columns.")
        elif create_labels:
            # Interactive labeling if no CSV and create_labels is True
            print("Interactive labeling mode...")
            self._interactive_labeling(data_dir)
            # After interactive labeling, load the created CSV
            if os.path.exists(csv_path):
                 try:
                    df = pd.read_csv(csv_path)
                    if not df.empty:
                        image_paths = [os.path.join(data_dir, img) for img in df['image_path']]
                        labels = df['damage_type'].tolist()
                        damage_areas = df.get('damage_area', [0] * len(image_paths)).tolist()
                    else:
                        print(f"Warning: {csv_path} is empty after interactive labeling.")
                 except pd.errors.EmptyDataError:
                    print(f"Warning: {csv_path} is empty or has no columns after interactive labeling.")
            else:
                print(f"Warning: {csv_path} was not created during interactive labeling.")

        else:
            # Assume directory structure: data_dir/damage_type/image.jpg
            # Or images are directly in data_dir without subdirectories
            for item in os.listdir(data_dir):
                item_path = os.path.join(data_dir, item)
                if os.path.isdir(item_path):
                    # Process as directory structure
                    for img_file in os.listdir(item_path):
                        if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
                            image_paths.append(os.path.join(item_path, img_file))
                            labels.append(item)
                            damage_areas.append(0)  # Default area
                elif os.path.isfile(item_path) and item.lower().endswith(('.png', '.jpg', '.jpeg')):
                     # Process as images directly in data_dir (requires labeling)
                     # In this case, the user needs to provide labels via CSV or interactive labeling
                     # This part of the logic should ideally not be reached if create_labels is True
                     # or if a CSV exists. If reached, it means unlabeled images exist.
                     print(f"Warning: Found unlabeled image {item_path}. Please provide labels via CSV or set create_labels=True for interactive labeling.")
                     pass # Or raise an error, or add to a list of unlabeled images


        print(f"Found {len(image_paths)} images")
        print(f"Label distribution: {Counter(labels)}")

        return image_paths, labels, damage_areas

    def _interactive_labeling(self, data_dir: str):
        labels_data = []
        image_files = [f for f in os.listdir(data_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

        if not image_files:
            print(f"No image files found in {data_dir} for interactive labeling.")
            return

        print(f"Found {len(image_files)} images for interactive labeling.")

        for img_file in image_files:
            img_path = os.path.join(data_dir, img_file)

            # Display image
            img = cv2.imread(img_path)
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            plt.figure(figsize=(10, 8))
            plt.imshow(img_rgb)
            plt.title(f"Label this image: {img_file}")
            plt.axis('off')
            plt.show()

            # Get label
            print("\nDamage types:")
            for i, damage_type in enumerate(self.damage_classes):
                print(f"{i}: {damage_type}")

            while True:
                try:
                    label_idx = int(input("Enter damage type index: "))
                    if 0 <= label_idx < len(self.damage_classes):
                        damage_type = self.damage_classes[label_idx]
                        break
                    else:
                        print("Invalid index. Try again.")
                except ValueError:
                    print("Please enter a valid number.")

            # Get damage area estimate (optional)
            try:
                area = float(input("Estimated damage area (0-1, where 1 is entire image): ") or "0")
            except ValueError:
                area = 0

            labels_data.append({
                'image_path': img_file,
                'damage_type': damage_type,
                'damage_area': area
            })

            plt.close()

        # Save labels
        if labels_data:
            df = pd.DataFrame(labels_data)
            df.to_csv(os.path.join(data_dir, 'labels.csv'), index=False)
            print(f"Labels saved to {os.path.join(data_dir, 'labels.csv')}")
        else:
            print("No labels were collected during interactive labeling. CSV file not created.")


    def train_model(self, data_dir: str, epochs: int = 50, batch_size: int = 8,
                   learning_rate: float = 0.001):
        # Prepare dataset
        image_paths, labels, damage_areas = self.prepare_dataset(data_dir, create_labels=True) # Set create_labels to True here

        if not image_paths:
            print("No images found or labeled. Cannot proceed with training.")
            return

        # Encode labels
        encoded_labels = self.label_encoder.transform(labels)

        # Split data (80-20 split for small dataset)
        train_paths, val_paths, train_labels, val_labels, train_areas, val_areas = train_test_split(
            image_paths, encoded_labels, damage_areas,
            test_size=0.2, random_state=42, stratify=encoded_labels
        )

        print(f"Training samples: {len(train_paths)}")
        print(f"Validation samples: {len(val_paths)}")

        # Create datasets
        train_dataset = CarDamageDataset(train_paths, train_labels,
                                       self.train_transform, train_areas)
        val_dataset = CarDamageDataset(val_paths, val_labels,
                                     self.val_transform, val_areas)

        # Create data loaders
        train_loader = DataLoader(train_dataset, batch_size=batch_size,
                                shuffle=True, num_workers=2)
        val_loader = DataLoader(val_dataset, batch_size=batch_size,
                              shuffle=False, num_workers=2)

        # Loss functions and optimizer
        criterion_cls = nn.CrossEntropyLoss()
        criterion_reg = nn.MSELoss()
        optimizer = optim.AdamW(self.model.parameters(), lr=learning_rate,
                               weight_decay=0.01)

        # Learning rate scheduler
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min',
                                                       patience=5, factor=0.5)

        # Training loop
        train_losses = []
        val_losses = []
        best_val_loss = float('inf')

        for epoch in range(epochs):
            # Training phase
            self.model.train()
            train_loss = 0.0

            for images, labels, areas in train_loader:
                images, labels, areas = images.to(self.device), labels.to(self.device), areas.to(self.device)

                optimizer.zero_grad()

                cls_output, area_output = self.model(images)

                # Calculate losses
                cls_loss = criterion_cls(cls_output, labels)
                reg_loss = criterion_reg(area_output.squeeze(), areas.float())

                # Combined loss (weighted)
                total_loss = cls_loss + 0.1 * reg_loss

                total_loss.backward()
                optimizer.step()

                train_loss += total_loss.item()

            # Validation phase
            self.model.eval()
            val_loss = 0.0
            correct = 0
            total = 0

            with torch.no_grad():
                for images, labels, areas in val_loader:
                    images, labels, areas = images.to(self.device), labels.to(self.device), areas.to(self.device)

                    cls_output, area_output = self.model(images)

                    cls_loss = criterion_cls(cls_output, labels)
                    reg_loss = criterion_reg(area_output.squeeze(), areas.float())
                    total_loss = cls_loss + 0.1 * reg_loss

                    val_loss += total_loss.item()

                    _, predicted = torch.max(cls_output.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

            # Calculate average losses
            train_loss /= len(train_loader)
            val_loss /= len(val_loader)
            accuracy = 100 * correct / total

            train_losses.append(train_loss)
            val_losses.append(val_loss)

            # Learning rate scheduling
            scheduler.step(val_loss)

            print(f'Epoch [{epoch+1}/{epochs}], '
                  f'Train Loss: {train_loss:.4f}, '
                  f'Val Loss: {val_loss:.4f}, '
                  f'Val Acc: {accuracy:.2f}%')

            # Save best model
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                self.save_model('best_car_damage_model.pth')

        # Plot training curves
        self._plot_training_curves(train_losses, val_losses)

        # Load best model
        self.load_model('best_car_damage_model.pth')

        print(f"Training completed. Best validation loss: {best_val_loss:.4f}")

    def _plot_training_curves(self, train_losses: List, val_losses: List):
        """
        Plot training and validation loss curves
        """
        plt.figure(figsize=(12, 4))

        plt.subplot(1, 2, 1)
        plt.plot(train_losses, label='Train Loss')
        plt.plot(val_losses, label='Validation Loss')
        plt.title('Training and Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.savefig('training_curves.png', dpi=300, bbox_inches='tight')
        plt.show()

    def predict_damage(self, image_path: str) -> Dict:
        """
        Predict damage type and area for a single image
        """
        self.model.eval()

        # Load and preprocess image
        image = Image.open(image_path).convert('RGB')
        image_tensor = self.val_transform(image).unsqueeze(0).to(self.device)

        with torch.no_grad():
            cls_output, area_output = self.model(image_tensor)

            # Get prediction probabilities
            probabilities = torch.softmax(cls_output, dim=1)
            confidence, predicted_class = torch.max(probabilities, 1)

            # Get damage area prediction
            predicted_area = area_output.item()

        # Decode prediction
        damage_type = self.label_encoder.inverse_transform([predicted_class.item()])[0]
        confidence_score = confidence.item()

        # Get all class probabilities
        all_probabilities = probabilities.cpu().numpy()[0]
        class_probabilities = {}
        for i, class_name in enumerate(self.damage_classes):
            class_probabilities[class_name] = float(all_probabilities[i])

        return {
            'image_path': image_path,
            'predicted_damage': damage_type,
            'confidence': confidence_score,
            'predicted_area': max(0, min(1, predicted_area)),  # Clamp between 0 and 1
            'class_probabilities': class_probabilities
        }

    def estimate_repair_cost(self, prediction_result: Dict,
                           car_info: Dict = None) -> Dict:
        """
        Estimate repair cost based on damage prediction
        """
        damage_type = prediction_result['predicted_damage']
        confidence = prediction_result['confidence']
        damage_area = prediction_result['predicted_area']

        # Base cost
        base_cost = self.base_prices.get(damage_type, 0)

        # Area multiplier (more damage = higher cost)
        area_multiplier = 1 + (damage_area * 2)  # Max 3x multiplier

        # Confidence multiplier (lower confidence = higher uncertainty)
        confidence_multiplier = 1 + (1 - confidence) * 0.5

        # Calculate base repair cost
        repair_cost = base_cost * area_multiplier * confidence_multiplier

        # Apply car-specific multipliers
        if car_info:
            # Luxury car multiplier
            if car_info.get('luxury_brand', False):
                repair_cost *= 1.5

            # Age multiplier
            age = car_info.get('age', 5)
            if age > 10:
                repair_cost *= 0.8  # Older cars cheaper to repair
            elif age < 3:
                repair_cost *= 1.2  # Newer cars more expensive

            # Car value multiplier
            car_value = car_info.get('value', 15000)
            if car_value > 30000:
                repair_cost *= 1.3
            elif car_value < 10000:
                repair_cost *= 0.7

        # Calculate confidence interval
        uncertainty = 1 - confidence
        lower_bound = repair_cost * (1 - uncertainty * 0.3)
        upper_bound = repair_cost * (1 + uncertainty * 0.3)

        return {
            'estimated_cost': float(repair_cost),
            'confidence_interval': (float(lower_bound), float(upper_bound)),
            'cost_factors': {
                'base_cost': float(base_cost),
                'area_multiplier': float(area_multiplier),
                'confidence_multiplier': float(confidence_multiplier),
                'damage_area': float(damage_area)
            }
        }

    def process_image(self, image_path: str, car_info: Dict = None) -> Dict:
        """
        Complete pipeline: predict damage and estimate cost
        """
        # Predict damage
        damage_result = self.predict_damage(image_path)

        # Estimate cost
        cost_result = self.estimate_repair_cost(damage_result, car_info)

        # Combine results
        complete_result = {
            **damage_result,
            **cost_result,
            'car_info': car_info
        }

        return complete_result

    def visualize_prediction(self, image_path: str, prediction_result: Dict,
                           save_path: str = None):
        """
        Visualize prediction results
        """
        # Load image
        img = cv2.imread(image_path)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Create figure with subplots
        fig, axes = plt.subplots(1, 2, figsize=(15, 6))

        # Original image with prediction
        axes[0].imshow(img_rgb)
        axes[0].set_title(f"Predicted: {prediction_result['predicted_damage']}\n"
                         f"Confidence: {prediction_result['confidence']:.2f}\n"
                         f"Estimated Cost: ${prediction_result['estimated_cost']:.2f}")
        axes[0].axis('off')

        # Probability distribution
        classes = list(prediction_result['class_probabilities'].keys())
        probs = list(prediction_result['class_probabilities'].values())

        bars = axes[1].bar(range(len(classes)), probs)
        axes[1].set_xlabel('Damage Types')
        axes[1].set_ylabel('Probability')
        axes[1].set_title('Class Probabilities')
        axes[1].set_xticks(range(len(classes)))
        axes[1].set_xticklabels(classes, rotation=45, ha='right')

        # Highlight predicted class
        predicted_idx = classes.index(prediction_result['predicted_damage'])
        bars[predicted_idx].set_color('red')

        plt.tight_layout()

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        else:
            plt.show()

    def evaluate_model(self, data_dir: str):
        """
        Evaluate model performance
        """
        # Load test data
        image_paths, labels, damage_areas = self.prepare_dataset(data_dir)

        predictions = []
        true_labels = []

        for img_path, true_label in zip(image_paths, labels):
            result = self.predict_damage(img_path)
            predictions.append(result['predicted_damage'])
            true_labels.append(true_label)

        # Print classification report
        print("\nClassification Report:")
        print(classification_report(true_labels, predictions))

        # Plot confusion matrix
        cm = confusion_matrix(true_labels, predictions, labels=self.damage_classes)

        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                   xticklabels=self.damage_classes,
                   yticklabels=self.damage_classes)
        plt.title('Confusion Matrix')
        plt.xlabel('Predicted')
        plt.ylabel('Actual')
        plt.tight_layout()
        plt.savefig('confusion_matrix.png', dpi=300, bbox_inches='tight')
        plt.show()

    def save_model(self, path: str):
        """
        Save the trained model
        """
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'damage_classes': self.damage_classes,
            'label_encoder': self.label_encoder,
            'base_prices': self.base_prices
        }, path)
        print(f"Model saved to {path}")

    def load_model(self, path: str):
        """
        Load a trained model
        """
        checkpoint = torch.load(path, map_location=self.device)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.damage_classes = checkpoint['damage_classes']
        self.label_encoder = checkpoint['label_encoder']
        self.base_prices = checkpoint['base_prices']
        print(f"Model loaded from {path}")

    def batch_process(self, image_dir: str, output_csv: str = "damage_predictions.csv"):
        """
        Process multiple images and save results to CSV
        """
        results = []

        for img_file in os.listdir(image_dir):
            if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
                img_path = os.path.join(image_dir, img_file)

                try:
                    result = self.process_image(img_path)
                    results.append({
                        'image_name': img_file,
                        'damage_type': result['predicted_damage'],
                        'confidence': result['confidence'],
                        'damage_area': result['predicted_area'],
                        'estimated_cost': result['estimated_cost'],
                        'cost_lower': result['confidence_interval'][0],
                        'cost_upper': result['confidence_interval'][1]
                    })
                    print(f"Processed: {img_file}")
                except Exception as e:
                    print(f"Error processing {img_file}: {str(e)}")

        # Save results
        df = pd.DataFrame(results)
        df.to_csv(output_csv, index=False)
        print(f"Results saved to {output_csv}")

        return df

In [16]:
def _plot_training_curves(self, train_losses: List, val_losses: List):
        """
        Plot training and validation loss curves
        """
        plt.figure(figsize=(12, 4))

        plt.subplot(1, 2, 1)
        plt.plot(train_losses, label='Train Loss')
        plt.plot(val_losses, label='Validation Loss')
        plt.title('Training and Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.savefig('training_curves.png', dpi=300, bbox_inches='tight')
        plt.show()

def predict_damage(self, image_path: str) -> Dict:
        """
        Predict damage type and area for a single image
        """
        self.model.eval()

        # Load and preprocess image
        image = Image.open(image_path).convert('RGB')
        image_tensor = self.val_transform(image).unsqueeze(0).to(self.device)

        with torch.no_grad():
            cls_output, area_output = self.model(image_tensor)

            # Get prediction probabilities
            probabilities = torch.softmax(cls_output, dim=1)
            confidence, predicted_class = torch.max(probabilities, 1)

            # Get damage area prediction
            predicted_area = area_output.item()

        # Decode prediction
        damage_type = self.label_encoder.inverse_transform([predicted_class.item()])[0]
        confidence_score = confidence.item()

        # Get all class probabilities
        all_probabilities = probabilities.cpu().numpy()[0]
        class_probabilities = {}
        for i, class_name in enumerate(self.damage_classes):
            class_probabilities[class_name] = float(all_probabilities[i])

        return {
            'image_path': image_path,
            'predicted_damage': damage_type,
            'confidence': confidence_score,
            'predicted_area': max(0, min(1, predicted_area)),  # Clamp between 0 and 1
            'class_probabilities': class_probabilities
        }

def estimate_repair_cost(self, prediction_result: Dict,
                           car_info: Dict = None) -> Dict:
        """
        Estimate repair cost based on damage prediction
        """
        damage_type = prediction_result['predicted_damage']
        confidence = prediction_result['confidence']
        damage_area = prediction_result['predicted_area']

        # Base cost
        base_cost = self.base_prices.get(damage_type, 0)

        # Area multiplier (more damage = higher cost)
        area_multiplier = 1 + (damage_area * 2)  # Max 3x multiplier

        # Confidence multiplier (lower confidence = higher uncertainty)
        confidence_multiplier = 1 + (1 - confidence) * 0.5

        # Calculate base repair cost
        repair_cost = base_cost * area_multiplier * confidence_multiplier

        # Apply car-specific multipliers
        if car_info:
            # Luxury car multiplier
            if car_info.get('luxury_brand', False):
                repair_cost *= 1.5

            # Age multiplier
            age = car_info.get('age', 5)
            if age > 10:
                repair_cost *= 0.8  # Older cars cheaper to repair
            elif age < 3:
                repair_cost *= 1.2  # Newer cars more expensive

            # Car value multiplier
            car_value = car_info.get('value', 15000)
            if car_value > 30000:
                repair_cost *= 1.3
            elif car_value < 10000:
                repair_cost *= 0.7

        # Calculate confidence interval
        uncertainty = 1 - confidence
        lower_bound = repair_cost * (1 - uncertainty * 0.3)
        upper_bound = repair_cost * (1 + uncertainty * 0.3)

        return {
            'estimated_cost': float(repair_cost),
            'confidence_interval': (float(lower_bound), float(upper_bound)),
            'cost_factors': {
                'base_cost': float(base_cost),
                'area_multiplier': float(area_multiplier),
                'confidence_multiplier': float(confidence_multiplier),
                'damage_area': float(damage_area)
            }
        }

def process_image(self, image_path: str, car_info: Dict = None) -> Dict:
        """
        Complete pipeline: predict damage and estimate cost
        """
        # Predict damage
        damage_result = self.predict_damage(image_path)

        # Estimate cost
        cost_result = self.estimate_repair_cost(damage_result, car_info)

        # Combine results
        complete_result = {
            **damage_result,
            **cost_result,
            'car_info': car_info
        }

        return complete_result

def visualize_prediction(self, image_path: str, prediction_result: Dict,
                           save_path: str = None):
        """
        Visualize prediction results
        """
        # Load image
        img = cv2.imread(image_path)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Create figure with subplots
        fig, axes = plt.subplots(1, 2, figsize=(15, 6))

        # Original image with prediction
        axes[0].imshow(img_rgb)
        axes[0].set_title(f"Predicted: {prediction_result['predicted_damage']}\n"
                         f"Confidence: {prediction_result['confidence']:.2f}\n"
                         f"Estimated Cost: ${prediction_result['estimated_cost']:.2f}")
        axes[0].axis('off')

        # Probability distribution
        classes = list(prediction_result['class_probabilities'].keys())
        probs = list(prediction_result['class_probabilities'].values())

        bars = axes[1].bar(range(len(classes)), probs)
        axes[1].set_xlabel('Damage Types')
        axes[1].set_ylabel('Probability')
        axes[1].set_title('Class Probabilities')
        axes[1].set_xticks(range(len(classes)))
        axes[1].set_xticklabels(classes, rotation=45, ha='right')

        # Highlight predicted class
        predicted_idx = classes.index(prediction_result['predicted_damage'])
        bars[predicted_idx].set_color('red')

        plt.tight_layout()

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        else:
            plt.show()

def evaluate_model(self, data_dir: str):
        """
        Evaluate model performance
        """
        # Load test data
        image_paths, labels, damage_areas = self.prepare_dataset(data_dir)

        predictions = []
        true_labels = []

        for img_path, true_label in zip(image_paths, labels):
            result = self.predict_damage(img_path)
            predictions.append(result['predicted_damage'])
            true_labels.append(true_label)

        # Print classification report
        print("\nClassification Report:")
        print(classification_report(true_labels, predictions))

        # Plot confusion matrix
        cm = confusion_matrix(true_labels, predictions, labels=self.damage_classes)

        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                   xticklabels=self.damage_classes,
                   yticklabels=self.damage_classes)
        plt.title('Confusion Matrix')
        plt.xlabel('Predicted')
        plt.ylabel('Actual')
        plt.tight_layout()
        plt.savefig('confusion_matrix.png', dpi=300, bbox_inches='tight')
        plt.show()

def save_model(self, path: str):
        """
        Save the trained model
        """
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'damage_classes': self.damage_classes,
            'label_encoder': self.label_encoder,
            'base_prices': self.base_prices
        }, path)
        print(f"Model saved to {path}")

def load_model(self, path: str):
        """
        Load a trained model
        """
        checkpoint = torch.load(path, map_location=self.device)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.damage_classes = checkpoint['damage_classes']
        self.label_encoder = checkpoint['label_encoder']
        self.base_prices = checkpoint['base_prices']
        print(f"Model loaded from {path}")

def batch_process(self, image_dir: str, output_csv: str = "damage_predictions.csv"):
        """
        Process multiple images and save results to CSV
        """
        results = []

        for img_file in os.listdir(image_dir):
            if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
                img_path = os.path.join(image_dir, img_file)

                try:
                    result = self.process_image(img_path)
                    results.append({
                        'image_name': img_file,
                        'damage_type': result['predicted_damage'],
                        'confidence': result['confidence'],
                        'damage_area': result['predicted_area'],
                        'estimated_cost': result['estimated_cost'],
                        'cost_lower': result['confidence_interval'][0],
                        'cost_upper': result['confidence_interval'][1]
                    })
                    print(f"Processed: {img_file}")
                except Exception as e:
                    print(f"Error processing {img_file}: {str(e)}")

        # Save results
        df = pd.DataFrame(results)
        df.to_csv(output_csv, index=False)
        print(f"Results saved to {output_csv}")

        return df

In [20]:
def main():
    # Initialize detector
    detector = CarDamageDetector()


    print("Starting training with your 70-image dataset...")
    detector.train_model(
        data_dir='/content/drive/MyDrive/cd_datset',  # Your dataset directory
        epochs=50,  # Increased epochs for small dataset
        batch_size=4,  # Small batch size for small dataset
        learning_rate=0.0001  # Lower learning rate for fine-tuning
    )

    # Example: Process a single image
    image_path = "path/to/your/test/image.jpg"
    car_info = {
        'age': 5,
        'value': 20000,
        'luxury_brand': False
    }

    result = detector.process_image(image_path, car_info)

    print(f"\nDamage Detection Results:")
    print(f"Predicted damage: {result['predicted_damage']}")
    print(f"Confidence: {result['confidence']:.2f}")
    print(f"Damage area: {result['predicted_area']:.2f}")
    print(f"Estimated repair cost: ${result['estimated_cost']:.2f}")
    print(f"Cost range: ${result['confidence_interval'][0]:.2f} - ${result['confidence_interval'][1]:.2f}")

    # Visualize result
    detector.visualize_prediction(image_path, result, "prediction_result.png")

    # Batch processing
    # detector.batch_process("test_images", "results.csv")

    print("\nTraining completed! Your CNN model is ready for car damage detection.")

if __name__ == "__main__":
    main()

Using device: cpu
Starting training with your 70-image dataset...
Found 0 images
Label distribution: Counter()
No images found or labeled. Cannot proceed with training.

Damage Detection Results:


NameError: name 'result' is not defined