In [None]:
def analyze_class_performance(labels_true, labels_pred, class_names):
    """Analyze per-class performance metrics"""
    performance = {}
    for i, class_name in enumerate(class_names):
        mask = labels_true == i
        if np.sum(mask) > 0:
            accuracy = np.mean(labels_pred[mask] == labels_true[mask])
            performance[class_name] = {
                'accuracy': accuracy,
                'samples': np.sum(mask)
            }
    return performance
#!/usr/bin/env python3

import os
import random
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import torchvision.models as models
from PIL import Image

class DisasterDataset(Dataset):
    def __init__(self, data_df, img_dir, transform=None):
        self.data_df = data_df.copy()
        self.img_dir = img_dir
        self.transform = transform
        
        # Create label mappings
        self.disaster_types = sorted(data_df['Disaster Type'].unique())
        self.disaster2idx = {disaster: idx for idx, disaster in enumerate(self.disaster_types)}
        
        # Calculate class weights for disaster types
        disaster_counts = data_df['Disaster Type'].value_counts()
        total_samples = len(data_df)
        self.disaster_weights = torch.FloatTensor([
            total_samples / (len(disaster_counts) * count) 
            for disaster in self.disaster_types
            for count in [disaster_counts[disaster]]
        ])

    def __len__(self):
        return len(self.data_df)

    def __getitem__(self, idx):
        row = self.data_df.iloc[idx]
        img_path = os.path.join(self.img_dir, row['Image Name'])
        
        # Load and transform image
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        
        # Get labels
        disaster_label = self.disaster2idx[row['Disaster Type']]
        severity_label = int(row['Severity'])
        
        return {
            'image': image,
            'disaster_type': disaster_label,
            'severity': severity_label,
            'weight': self.disaster_weights[disaster_label]
        }

class MultiTaskModel(nn.Module):
    def __init__(self, num_disaster_types):
        super(MultiTaskModel, self).__init__()
        
        # Use ResNet50 backbone
        self.backbone = models.resnet50(pretrained=True)
        num_features = self.backbone.fc.in_features
        self.backbone = nn.Sequential(*list(self.backbone.children())[:-1])
        
        # Task-specific heads with dropout
        self.disaster_classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(num_features, num_disaster_types)
        )
        
        self.severity_classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(num_features, 3)
        )
        
    def forward(self, x):
        features = self.backbone(x).squeeze(-1).squeeze(-1)
        disaster_out = self.disaster_classifier(features)
        severity_out = self.severity_classifier(features)
        return disaster_out, severity_out

def train_epoch(model, train_loader, optimizer, device, criterion):
    model.train()
    total_loss = 0
    disaster_correct = 0
    severity_correct = 0
    total_samples = 0
    
    for batch in train_loader:
        images = batch['image'].to(device)
        disaster_labels = batch['disaster_type'].to(device)
        severity_labels = batch['severity'].to(device)
        weights = batch['weight'].to(device)
        
        optimizer.zero_grad()
        disaster_out, severity_out = model(images)
        
        # Calculate weighted disaster loss - ensure it's a scalar
        disaster_loss = criterion(disaster_out, disaster_labels)
        disaster_loss = (disaster_loss * weights).mean()  # Reduce to scalar
        
        # Calculate severity loss - ensure it's a scalar
        severity_loss = criterion(severity_out, severity_labels).mean()  # Reduce to scalar
        
        # Combined loss (now guaranteed to be a scalar)
        loss = disaster_loss + severity_loss
        loss.backward()
        optimizer.step()
        
        # Track metrics
        with torch.no_grad():
            disaster_pred = disaster_out.argmax(dim=1)
            severity_pred = severity_out.argmax(dim=1)
            disaster_correct += (disaster_pred == disaster_labels).sum().item()
            severity_correct += (severity_pred == severity_labels).sum().item()
            total_samples += images.size(0)
            total_loss += loss.item()
    
    return {
        'loss': total_loss / len(train_loader),
        'disaster_acc': disaster_correct / total_samples,
        'severity_acc': severity_correct / total_samples
    }

def evaluate(model, data_loader, device, criterion):
    model.eval()
    total_loss = 0
    disaster_correct = 0
    severity_correct = 0
    total_samples = 0
    
    all_disaster_preds = []
    all_severity_preds = []
    all_disaster_labels = []
    all_severity_labels = []
    
    # Initialize per-class metrics
    class_correct = {}
    class_total = {}
    
    with torch.no_grad():
        for batch in data_loader:
            images = batch['image'].to(device)
            disaster_labels = batch['disaster_type'].to(device)
            severity_labels = batch['severity'].to(device)
            
            disaster_out, severity_out = model(images)
            
            # Calculate losses - ensure they're scalars
            disaster_loss = criterion(disaster_out, disaster_labels).mean()  # Reduce to scalar
            severity_loss = criterion(severity_out, severity_labels).mean()  # Reduce to scalar
            loss = disaster_loss + severity_loss  # Now guaranteed to be a scalar
            
            # Track predictions
            disaster_pred = disaster_out.argmax(dim=1)
            severity_pred = severity_out.argmax(dim=1)
            
            disaster_correct += (disaster_pred == disaster_labels).sum().item()
            severity_correct += (severity_pred == severity_labels).sum().item()
            total_samples += images.size(0)
            total_loss += loss.item()
            
            all_disaster_preds.extend(disaster_pred.cpu().numpy())
            all_severity_preds.extend(severity_pred.cpu().numpy())
            all_disaster_labels.extend(disaster_labels.cpu().numpy())
            all_severity_labels.extend(severity_labels.cpu().numpy())
    
    return {
        'loss': total_loss / len(data_loader),
        'disaster_acc': disaster_correct / total_samples,
        'severity_acc': severity_correct / total_samples,
        'predictions': {
            'disaster': np.array(all_disaster_preds),
            'severity': np.array(all_severity_preds)
        },
        'labels': {
            'disaster': np.array(all_disaster_labels),
            'severity': np.array(all_severity_labels)
        }
    }

def main():
    # Configuration
    RANDOM_SEED = 42
    IMG_SIZE = 224
    BATCH_SIZE = 32
    NUM_EPOCHS = 10
    LEARNING_RATE = 1e-4
    
    # Set random seeds
    random.seed(RANDOM_SEED)
    np.random.seed(RANDOM_SEED)
    torch.manual_seed(RANDOM_SEED)
    
    # Data paths
    img_dir = "images"
    csv_path = "disasterData/disaster_analysis_filename_based.csv"
    
    # Load and preprocess data
    df = pd.read_csv(csv_path)
    
    # Analyze class distribution and significance
    class_counts = df['Disaster Type'].value_counts()
    total_samples = len(df)
    
    # Calculate class significance scores
    significance_scores = {}
    min_class_ratio = 0.02  # Minimum 2% of total samples
    min_absolute_samples = 5  # Minimum absolute number of samples
    
    print("\nClass Distribution Analysis:")
    for disaster_type, count in class_counts.items():
        ratio = count / total_samples
        significance_score = ratio * (count / min_absolute_samples)
        significance_scores[disaster_type] = significance_score
        
        print(f"{disaster_type}:")
        print(f"  Samples: {count}")
        print(f"  Ratio: {ratio:.3f}")
        print(f"  Significance Score: {significance_score:.3f}")
    
    # Select significant classes
    significance_threshold = 0.1
    valid_classes = [
        cls for cls, score in significance_scores.items()
        if score >= significance_threshold and class_counts[cls] >= min_absolute_samples
    ]
    
    # Create separate dataframes for each class
    train_df = pd.DataFrame()
    test_df = pd.DataFrame()
    
    for disaster_type in valid_classes:
        # Get data for this class
        class_data = df[df['Disaster Type'] == disaster_type]
        n_samples = len(class_data)
        
        # Calculate split sizes
        n_test = max(2, int(0.3 * n_samples))  # At least 2 samples for test
        
        # Random split
        class_test = class_data.sample(n=n_test, random_state=RANDOM_SEED)
        class_train = class_data.drop(class_test.index)
        
        train_df = pd.concat([train_df, class_train])
        test_df = pd.concat([test_df, class_test])
    
    # Final split of test into val and test
    val_df, test_df = train_test_split(
        test_df, 
        test_size=0.5, 
        random_state=RANDOM_SEED,
        stratify=test_df['Disaster Type']
    )
    
    # Shuffle all datasets
    train_df = train_df.sample(frac=1, random_state=RANDOM_SEED).reset_index(drop=True)
    val_df = val_df.sample(frac=1, random_state=RANDOM_SEED).reset_index(drop=True)
    test_df = test_df.sample(frac=1, random_state=RANDOM_SEED).reset_index(drop=True)
    
    print(f"\nSelected {len(valid_classes)} significant classes")
    print("Selected classes:", valid_classes)
    print("\nFinal dataset sizes:")
    print(f"Training samples: {len(train_df)}")
    print(f"Validation samples: {len(val_df)}")
    print(f"Test samples: {len(test_df)}")
    
    # Ensure at least 2 samples per class for train/test split
    train_size = 0.7  # 70% for training
    min_test_size = 2  # Minimum 2 samples per class in test set
    
    # Custom split to ensure minimum samples
    train_df = pd.DataFrame()
    test_df = pd.DataFrame()
    
    for disaster_type in valid_classes:
        class_data = df[df['Disaster Type'] == disaster_type]
        n_samples = len(class_data)
        n_test = max(min_test_size, int(n_samples * (1 - train_size)))
        
        # Random split within each class
        class_test = class_data.sample(n=n_test, random_state=RANDOM_SEED)
        class_train = class_data.drop(class_test.index)
        
        train_df = pd.concat([train_df, class_train])
        test_df = pd.concat([test_df, class_test])
    
    # Further split test into val and test
    val_df, test_df = train_test_split(
        test_df, 
        test_size=0.5, 
        random_state=RANDOM_SEED,
        stratify=test_df['Disaster Type']
    )
    
    # Shuffle the final datasets
    train_df = train_df.sample(frac=1, random_state=RANDOM_SEED).reset_index(drop=True)
    val_df = val_df.sample(frac=1, random_state=RANDOM_SEED).reset_index(drop=True)
    test_df = test_df.sample(frac=1, random_state=RANDOM_SEED).reset_index(drop=True)
    
    print("\nFinal dataset sizes:")
    print(f"Training samples: {len(train_df)}")
    print(f"Validation samples: {len(val_df)}")
    print(f"Test samples: {len(test_df)}")
    
    # Data transforms
    train_transform = transforms.Compose([
        transforms.RandomResizedCrop(IMG_SIZE),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.1, contrast=0.1),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    
    eval_transform = transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    
    # Create datasets and dataloaders
    train_dataset = DisasterDataset(train_df, img_dir, transform=train_transform)
    val_dataset = DisasterDataset(val_df, img_dir, transform=eval_transform)
    test_dataset = DisasterDataset(test_df, img_dir, transform=eval_transform)
    
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)
    
    # Setup model and training
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = MultiTaskModel(num_disaster_types=len(train_dataset.disaster_types))
    model = model.to(device)
    
    criterion = nn.CrossEntropyLoss(reduction='none')
    optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)
    
    # Training loop
    best_val_acc = 0
    for epoch in range(NUM_EPOCHS):
        train_metrics = train_epoch(model, train_loader, optimizer, device, criterion)
        val_metrics = evaluate(model, val_loader, device, criterion)
        
        # Save best model
        if val_metrics['disaster_acc'] > best_val_acc:
            best_val_acc = val_metrics['disaster_acc']
            torch.save(model.state_dict(), 'best_model.pth')
        
        print(f"\nEpoch {epoch+1}/{NUM_EPOCHS}")
        print(f"Train Loss: {train_metrics['loss']:.4f}")
        print(f"Train Disaster Acc: {train_metrics['disaster_acc']:.4f}")
        print(f"Train Severity Acc: {train_metrics['severity_acc']:.4f}")
        print(f"Val Loss: {val_metrics['loss']:.4f}")
        print(f"Val Disaster Acc: {val_metrics['disaster_acc']:.4f}")
        print(f"Val Severity Acc: {val_metrics['severity_acc']:.4f}")
    
    # Evaluate on test set
    model.load_state_dict(torch.load('best_model.pth'))
    test_metrics = evaluate(model, test_loader, device, criterion)
    
    print("\nTest Results:")
    print(f"Test Disaster Accuracy: {test_metrics['disaster_acc']:.4f}")
    print(f"Test Severity Accuracy: {test_metrics['severity_acc']:.4f}")
    
    # Calculate classification reports with zero_division parameter
    disaster_report = classification_report(
        test_metrics['labels']['disaster'],
        test_metrics['predictions']['disaster'],
        target_names=train_dataset.disaster_types,
        zero_division=0
    )
    
    severity_report = classification_report(
        test_metrics['labels']['severity'],
        test_metrics['predictions']['severity'],
        target_names=['Low', 'Medium', 'High'],
        zero_division=0
    )
    
    print("\nDisaster Type Classification Report:")
    print(disaster_report)
    
    print("\nSeverity Classification Report:")
    print(severity_report)
    
    # Analyze feature importance
    print("\nFeature Importance Analysis:")
    class_performance = {}
    for i, disaster_type in enumerate(train_dataset.disaster_types):
        mask = test_metrics['labels']['disaster'] == i
        if np.sum(mask) > 0:  # Only calculate if we have samples
            accuracy = np.mean(
                test_metrics['predictions']['disaster'][mask] == 
                test_metrics['labels']['disaster'][mask]
            )
            class_performance[disaster_type] = accuracy
            print(f"{disaster_type}:")
            print(f"  Accuracy: {accuracy:.4f}")
            print(f"  Samples: {np.sum(mask)}")
    
    # Identify problematic classes
    problem_threshold = 0.7  # Adjust based on your needs
    problematic_classes = [
        cls for cls, acc in class_performance.items()
        if acc < problem_threshold
    ]
    
    if problematic_classes:
        print("\nProblematic classes that might need more data or feature engineering:")
        for cls in problematic_classes:
            print(f"- {cls} (Accuracy: {class_performance[cls]:.4f})")
    print("\nDisaster Type Classification Report:")
    print(disaster_report)
    
    print("\nSeverity Classification Report:")
    print(severity_report)
    
    # Analyze feature importance
    print("\nFeature Importance Analysis:")
    class_performance = {}
    for i, disaster_type in enumerate(train_dataset.disaster_types):
        mask = test_metrics['labels']['disaster'] == i
        if np.sum(mask) > 0:
            accuracy = np.mean(
                test_metrics['predictions']['disaster'][mask] == 
                test_metrics['labels']['disaster'][mask]
            )
            class_performance[disaster_type] = accuracy
            print(f"{disaster_type}:")
            print(f"  Accuracy: {accuracy:.4f}")
            print(f"  Samples: {np.sum(mask)}")

    # Save disaster types to file - Add this here
    print("\nSaving disaster types to file...")
    with open('disaster_types.txt', 'w') as f:
        for disaster_type in train_dataset.disaster_types:
            f.write(f"{disaster_type}\n")
    print("Disaster types saved successfully")


if __name__ == "__main__":
    
    main()



Class Distribution Analysis:
fire:
  Samples: 48
  Ratio: 0.169
  Significance Score: 1.623
flood:
  Samples: 47
  Ratio: 0.165
  Significance Score: 1.556
structural damage:
  Samples: 46
  Ratio: 0.162
  Significance Score: 1.490
storm:
  Samples: 43
  Ratio: 0.151
  Significance Score: 1.302
earthquake:
  Samples: 27
  Ratio: 0.095
  Significance Score: 0.513
surface damage:
  Samples: 15
  Ratio: 0.053
  Significance Score: 0.158
tornado:
  Samples: 15
  Ratio: 0.053
  Significance Score: 0.158
leak:
  Samples: 14
  Ratio: 0.049
  Significance Score: 0.138
mold:
  Samples: 10
  Ratio: 0.035
  Significance Score: 0.070
tree:
  Samples: 8
  Ratio: 0.028
  Significance Score: 0.045
landslide:
  Samples: 5
  Ratio: 0.018
  Significance Score: 0.018
abandoned:
  Samples: 2
  Ratio: 0.007
  Significance Score: 0.003
snow:
  Samples: 2
  Ratio: 0.007
  Significance Score: 0.003
lava:
  Samples: 1
  Ratio: 0.004
  Significance Score: 0.001
sand:
  Samples: 1
  Ratio: 0.004
  Significance 




Epoch 1/10
Train Loss: 3.2250
Train Disaster Acc: 0.1538
Train Severity Acc: 0.3681
Val Loss: 2.8290
Val Disaster Acc: 0.3056
Val Severity Acc: 0.5556

Epoch 2/10
Train Loss: 2.7415
Train Disaster Acc: 0.3626
Train Severity Acc: 0.6374
Val Loss: 2.5440
Val Disaster Acc: 0.4722
Val Severity Acc: 0.6667

Epoch 3/10
Train Loss: 2.5131
Train Disaster Acc: 0.4011
Train Severity Acc: 0.6374
Val Loss: 2.3976
Val Disaster Acc: 0.5556
Val Severity Acc: 0.7222

Epoch 4/10
Train Loss: 2.1503
Train Disaster Acc: 0.4670
Train Severity Acc: 0.7198
Val Loss: 2.3151
Val Disaster Acc: 0.5278
Val Severity Acc: 0.7778

Epoch 5/10
Train Loss: 1.8589
Train Disaster Acc: 0.5934
Train Severity Acc: 0.7857
Val Loss: 2.2832
Val Disaster Acc: 0.5278
Val Severity Acc: 0.7222

Epoch 6/10
Train Loss: 1.5840
Train Disaster Acc: 0.6978
Train Severity Acc: 0.8022
Val Loss: 2.2939
Val Disaster Acc: 0.4722
Val Severity Acc: 0.6944

Epoch 7/10
Train Loss: 1.3277
Train Disaster Acc: 0.6868
Train Severity Acc: 0.8187
Val

In [181]:
# Add these imports to match the training code
import os
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
from PIL import Image
import matplotlib.pyplot as plt
import math
import numpy as np

# Use the same model architecture as in training
class MultiTaskModel(nn.Module):
    def __init__(self, num_disaster_types):
        super(MultiTaskModel, self).__init__()
        
        # Use ResNet50 backbone
        self.backbone = models.resnet50(pretrained=True)
        num_features = self.backbone.fc.in_features
        self.backbone = nn.Sequential(*list(self.backbone.children())[:-1])
        
        # Task-specific heads with dropout
        self.disaster_classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(num_features, num_disaster_types)
        )
        
        self.severity_classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(num_features, 3)
        )
        
    def forward(self, x):
        features = self.backbone(x).squeeze(-1).squeeze(-1)
        disaster_out = self.disaster_classifier(features)
        severity_out = self.severity_classifier(features)
        return disaster_out, severity_out

class DisasterPredictor:
    def __init__(self, model_path, disaster_types):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {self.device}")
        
        # Store disaster types
        self.disaster_types = disaster_types
        
        # Initialize model with correct number of classes
        self.model = MultiTaskModel(num_disaster_types=len(self.disaster_types))
        
        # Load trained weights
        try:
            self.model.load_state_dict(torch.load(model_path, map_location=self.device))
            print("Model loaded successfully")
        except Exception as e:
            print(f"Error loading model: {e}")
            raise
            
        self.model.to(self.device)
        self.model.eval()
        
        # Use the same transform as in training eval
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
        
        self.severity_levels = ['Low', 'Medium', 'High']

    def preprocess_image(self, image_path):
        """Preprocess the input image using training parameters"""
        try:
            image = Image.open(image_path).convert('RGB')
            transformed_image = self.transform(image)
            return transformed_image.unsqueeze(0)
        except Exception as e:
            print(f"Error processing image {image_path}: {e}")
            return None

    def predict(self, image_path):
        """Make predictions on a single image"""
        # Preprocess image
        image_tensor = self.preprocess_image(image_path)
        if image_tensor is None:
            return None
        
        image_tensor = image_tensor.to(self.device)
        
        # Make prediction
        with torch.no_grad():
            disaster_out, severity_out = self.model(image_tensor)
            
            # Get probabilities
            disaster_probs = torch.softmax(disaster_out, dim=1)
            severity_probs = torch.softmax(severity_out, dim=1)
            
            # Get predictions
            disaster_idx = disaster_probs.argmax(dim=1).item()
            severity_idx = severity_probs.argmax(dim=1).item()
            
            # Get confidence scores
            disaster_conf = disaster_probs[0][disaster_idx].item()
            severity_conf = severity_probs[0][severity_idx].item()
            
            # Get top-3 disaster predictions
            top3_disasters = []
            probs, indices = torch.topk(disaster_probs[0], min(3, len(self.disaster_types)))
            for prob, idx in zip(probs, indices):
                top3_disasters.append({
                    'type': self.disaster_types[idx],
                    'confidence': prob.item() * 100
                })
            
        return {
            'top_predictions': top3_disasters,
            'primary_disaster': self.disaster_types[disaster_idx],
            'disaster_confidence': disaster_conf * 100,
            'severity': self.severity_levels[severity_idx],
            'severity_confidence': severity_conf * 100
        }

def predict_all_images(predictor, image_dir):
    """
    Predict and display results for all images in the directory
    """
    # Get all image files
    valid_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
    image_files = [f for f in os.listdir(image_dir) 
                  if f.lower().endswith(valid_extensions)]
    
    if not image_files:
        print(f"No image files found in {image_dir}")
        return
    
    # Calculate grid dimensions
    n_images = len(image_files)
    n_cols = min(3, n_images)  # Max 3 images per row
    n_rows = math.ceil(n_images / n_cols)
    
    # Create figure
    plt.figure(figsize=(6*n_cols, 6*n_rows))
    
    # Process each image
    all_predictions = []
    
    for idx, image_file in enumerate(image_files, 1):
        image_path = os.path.join(image_dir, image_file)
        
        # Create subplot
        plt.subplot(n_rows, n_cols, idx)
        
        # Display image
        img = Image.open(image_path)
        plt.imshow(img)
        plt.axis('off')
        
        # Make prediction
        results = predictor.predict(image_path)
        
        if results:
            # Store prediction results
            all_predictions.append({
                'image': image_file,
                'predictions': results
            })
            
            # Format title with main prediction and confidence
            title = f"{results['primary_disaster']}\n"
            title += f"Confidence: {results['disaster_confidence']:.1f}%\n"
            title += f"Severity: {results['severity']}"
            
            plt.title(title, fontsize=10, pad=10)
            
            # Print detailed results
            print(f"\nResults for {image_file}:")
            print("Top predictions:")
            for pred in results['top_predictions']:
                print(f"- {pred['type']}: {pred['confidence']:.1f}%")
            print(f"Severity: {results['severity']} ({results['severity_confidence']:.1f}% confidence)")
            print("-" * 50)
    
    # Adjust layout and display
    plt.tight_layout()
    plt.show()
    
    return all_predictions

# Example usage in notebook:
