In [4]:
import torch
import torch.nn as nn
from torchvision import models
from enum import Enum
from typing import Optional, Dict, Any
import logging

logger = logging.getLogger(__name__)

class BackboneType(Enum):
    ALEXNET = ("alexnet", "IMAGENET1K_V1")
    RESNET18 = ("resnet18", "IMAGENET1K_V1")
    RESNET50 = ("resnet50", "IMAGENET1K_V1")
    RESNET101 = ("resnet101", "IMAGENET1K_V1")
    VGG16 = ("vgg16", "IMAGENET1K_V1")
    VGG19 = ("vgg19", "IMAGENET1K_V1")
    INCEPTION_V3 = ("inception_v3", "IMAGENET1K_V1")
    VIT_B_16 = ("vit_b_16", "IMAGENET1K_V1")
    VIT_B_32 = ("vit_b_32", "IMAGENET1K_V1")
    EFFICIENTNET_B0 = ("efficientnet_b0", "IMAGENET1K_V1")

    def __init__(self, model_name: str, weights_name: str):
        self.model_name = model_name
        self.weights_name = weights_name

class FeatureExtractorLayer(Enum):
    ALEXNET = "features.12"
    RESNET18 = "layer4.0.relu"
    RESNET50 = "layer3.2.bn1"
    RESNET101 = "layer3.2.bn1"
    VGG16 = "features.30"
    VGG19 = "features.36"
    INCEPTION_V3 = "Mixed_7a.branch3x3_1.bn"
    VIT_B_16 = "encoder.layers.encoder_layer_8.mlp"
    VIT_B_32 = "encoder.layers.encoder_layer_8.mlp"
    EFFICIENTNET_B0 = "features.6.2.stochastic_depth"

class ModularBackboneLSTM(nn.Module):
    def __init__(
        self,
        backbone_type: BackboneType,
        num_classes: int,
        hidden_size: int = 256,
        num_layers: int = 1,
        dropout_rate: float = 0.5,
        pretrained: bool = True,
        freeze_backbone: bool = True
    ):
        super(ModularBackboneLSTM, self).__init__()
        print(f"{backbone_type = }")
        self.backbone_type = backbone_type
        self.feature_layer = FeatureExtractorLayer[backbone_type.name].value
        
        # Initialize backbone and get its output features
        self.backbone, self.backbone_features = self._initialize_backbone(
            backbone_type, pretrained, freeze_backbone
        )
        
        self.dropout = nn.Dropout(dropout_rate)
        
        # LSTM layer
        self.lstm = nn.LSTM(
            input_size=self.backbone_features,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )
        
        # Feature combination layer
        self.combine_features = nn.Sequential(
            nn.Linear(self.backbone_features + hidden_size, 512),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        )
        
        # Final classifier
        self.classifier = nn.Linear(512, num_classes)
        
    def _initialize_backbone(
        self, 
        backbone_type: BackboneType, 
        pretrained: bool,
        freeze_backbone: bool
    ) -> tuple[nn.Module, int]:
        """Initialize the backbone model and return it along with its output features size."""
        
        # Get the model creation function
        model_func = getattr(models, backbone_type.model_name)
        
        # Initialize the model with or without pretrained weights
        if pretrained:
            try:
                # Get weights using models.get_weight
                # weights = models.get_weight(backbone_type.model_name, weights)
                # print(f"{weights = }")
                # model = model_func(weights=weights)
                model = models.get_model(backbone_type.model_name, backbone_type.weights_name)
                logger.info(f"Loaded pretrained weights: {backbone_type.weights_name}")
                print(f"{model = }")
            except Exception as e:
                logger.warning(f"Failed to load pretrained weights for {backbone_type.model_name}: {e}")
                model = model_func(weights=None)
        else:
            model = model_func(weights=None)
            
        # Get the feature extraction layer
        feature_layer = FeatureExtractorLayer[backbone_type.name].value
        
        # Create feature extractor
        layers = []
        current_layer = model
        
        for part in feature_layer.split('.'):
            current_layer = getattr(current_layer, part)
            layers.append(current_layer)
            
        modified_backbone = nn.Sequential(*layers)
        
        # Freeze backbone if specified
        if freeze_backbone:
            for param in modified_backbone.parameters():
                param.requires_grad = False
                
        # Get output features size
        if backbone_type in [BackboneType.VIT_B_16, BackboneType.VIT_B_32]:
            out_features = 768  # Standard ViT hidden size
        else:
            # Use a forward pass with dummy data to get output size
            with torch.no_grad():
                dummy_input = torch.randn(1, 1, 3, 224, 224) # (batch_size, timesteps, channels, height, width)
                out = modified_backbone(dummy_input)
                out_features = out.view(1, -1).size(1)
        
        return modified_backbone, out_features
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        batch_size, timesteps, C, H, W = x.size()
        
        # Process each timestep through backbone
        backbone_features = []
        for t in range(timesteps):
            current_frame = x[:, t, :, :, :]
            features = self.backbone(current_frame)
            features = features.view(batch_size, -1)
            backbone_features.append(features.unsqueeze(1))
        
        # Combine backbone features
        backbone_features = torch.cat(backbone_features, dim=1)
        backbone_features = self.dropout(backbone_features)
        
        # Process through LSTM
        lstm_out, (h_n, c_n) = self.lstm(backbone_features)
        lstm_out = self.dropout(lstm_out)
        
        # Get final outputs
        lstm_last_out = lstm_out[:, -1, :]
        backbone_last_out = backbone_features[:, -1, :]
        
        # Combine features
        combined = torch.cat((backbone_last_out, lstm_last_out), dim=1)
        combined = self.dropout(combined)
        
        # Final processing
        combined_features = self.combine_features(combined)
        combined_features = self.dropout(combined_features)
        
        # Classification
        output = self.classifier(combined_features)
        
        return output

def create_model(
    backbone_name: str,
    num_classes: int,
    hidden_size: int = 256,
    num_layers: int = 1,
    dropout_rate: float = 0.5,
    pretrained: bool = True,
    freeze_backbone: bool = True
) -> ModularBackboneLSTM:
    """
    Create a ModularBackboneLSTM model with the specified backbone.
    
    Args:
        backbone_name: Name of the backbone architecture
        num_classes: Number of output classes
        hidden_size: LSTM hidden size
        num_layers: Number of LSTM layers
        dropout_rate: Dropout rate
        pretrained: Whether to use pretrained weights
        freeze_backbone: Whether to freeze backbone parameters
    
    Returns:
        ModularBackboneLSTM model
    """
    try:
        backbone_type = BackboneType[backbone_name.upper()]
    except KeyError:
        raise ValueError(f"Unsupported backbone: {backbone_name}. "
                        f"Supported backbones: {[b.name for b in BackboneType]}")
    
    return ModularBackboneLSTM(
        backbone_type=backbone_type,
        num_classes=num_classes,
        hidden_size=hidden_size,
        num_layers=num_layers,
        dropout_rate=dropout_rate,
        pretrained=pretrained,
        freeze_backbone=freeze_backbone
    )

# Example usage   
model = create_model(
    backbone_name="RESNET50",
    num_classes=10,
    pretrained=True
)

Failed to load pretrained weights for resnet50: get_model() takes 1 positional argument but 2 were given


RuntimeError: Expected 3D (unbatched) or 4D (batched) input to conv2d, but got input of size: [1, 1, 3, 224, 224]

In [2]:
import torch
import logging
import sys
from typing import Dict, Any
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    stream=sys.stdout
)
logger = logging.getLogger(__name__)

def test_backbone(
    backbone_name: str,
    model_config: Dict[str, Any],
    input_size: Dict[str, int]
) -> bool:
    """
    Test a single backbone model.
    
    Args:
        backbone_name: Name of the backbone to test
        model_config: Configuration for the model
        input_size: Input tensor dimensions
        
    Returns:
        bool: True if test passed, False otherwise
    """
    try:
        # Create sample input
        sample_input = torch.randn(
            input_size['batch_size'],
            input_size['timesteps'],
            input_size['channels'],
            input_size['height'],
            input_size['width']
        )
        
        # Create and evaluate model
        logger.info(f"\nTesting {backbone_name}...")
        start_time = time.time()
        
        model = create_model(backbone_name=backbone_name, **model_config)
        model.eval()
        
        # Test forward pass
        with torch.no_grad():
            output = model(sample_input)
        
        # Verify output
        expected_shape = (input_size['batch_size'], model_config['num_classes'])
        assert tuple(output.shape) == expected_shape, \
            f"Wrong output shape: {tuple(output.shape)} vs {expected_shape}"
        
        assert not torch.isnan(output).any(), "Output contains NaN values"
        assert not torch.isinf(output).any(), "Output contains Inf values"
        
        end_time = time.time()
        logger.info(f"✓ {backbone_name} passed (Time: {end_time - start_time:.2f}s)")
        return True
        
    except Exception as e:
        logger.error(f"✗ {backbone_name} failed: {str(e)}")
        return False

def main():
    """Main test function to test all backbones."""
    logger.info("Starting backbone tests...")
    
    # Default model configuration
    model_config = {
        'num_classes': 10,
        'hidden_size': 256,
        'num_layers': 1,
        'dropout_rate': 0.5,
        'pretrained': True,
        'freeze_backbone': True
    }
    
    # Default input sizes
    default_input_size = {
        'batch_size': 2,
        'timesteps': 3,
        'channels': 3,
        'height': 224,
        'width': 224
    }
    
    # Special configurations for specific models
    special_configs = {
        'INCEPTION_V3': {
            'height': 299,
            'width': 299
        }
    }
    
    # List all available backbones
    backbones = [b.name for b in BackboneType]
    
    # Test results
    results = {}
    start_time_total = time.time()
    
    # Test each backbone
    for backbone_name in backbones:
        # Update input size if needed
        input_size = default_input_size.copy()
        if backbone_name in special_configs:
            input_size.update(special_configs[backbone_name])
        
        # Run test
        results[backbone_name] = test_backbone(
            backbone_name=backbone_name,
            model_config=model_config,
            input_size=input_size
        )
    
    # Print summary
    end_time_total = time.time()
    total_time = end_time_total - start_time_total
    
    logger.info("\nTest Summary")
    logger.info("=" * 50)
    logger.info(f"{'Model':<20} {'Status':<10}")
    logger.info("-" * 50)
    
    passed = 0
    for backbone, status in results.items():
        passed += int(status)
        status_str = "✓ PASSED" if status else "✗ FAILED"
        logger.info(f"{backbone:<20} {status_str:<10}")
    
    logger.info("=" * 50)
    logger.info(f"Total Passed: {passed}/{len(results)}")
    logger.info(f"Total Time: {total_time:.2f}s")
    logger.info("=" * 50)
    
    # Additional info about memory usage
    if torch.cuda.is_available():
        logger.info("\nGPU Memory Usage")
        logger.info("-" * 50)
        for i in range(torch.cuda.device_count()):
            memory_allocated = torch.cuda.memory_allocated(i) / 1024**2
            memory_reserved = torch.cuda.memory_reserved(i) / 1024**2
            logger.info(f"GPU {i}:")
            logger.info(f"  Allocated: {memory_allocated:.2f} MB")
            logger.info(f"  Reserved:  {memory_reserved:.2f} MB")

if __name__ == "__main__":
    # Check for CUDA
    device = "cuda" if torch.cuda.is_available() else "cpu"
    logger.info(f"Using device: {device}")
    
    # Run tests
    main()

2024-10-24 20:47:36,203 - INFO - Using device: cpu
2024-10-24 20:47:36,204 - INFO - Starting backbone tests...
2024-10-24 20:47:36,416 - INFO - 
Testing ALEXNET...




2024-10-24 20:47:51,775 - INFO - ✓ ALEXNET passed (Time: 15.36s)
2024-10-24 20:47:51,788 - INFO - 
Testing RESNET18...
2024-10-24 20:47:52,182 - ERROR - ✗ RESNET18 failed: Given groups=1, weight of size [512, 256, 3, 3], expected input[1, 3, 224, 224] to have 256 channels, but got 3 channels instead
2024-10-24 20:47:52,188 - INFO - 
Testing RESNET50...
2024-10-24 20:47:52,434 - ERROR - ✗ RESNET50 failed: Given groups=1, weight of size [256, 512, 1, 1], expected input[1, 3, 224, 224] to have 512 channels, but got 3 channels instead
2024-10-24 20:47:52,446 - INFO - 
Testing RESNET101...
2024-10-24 20:47:53,027 - ERROR - ✗ RESNET101 failed: Given groups=1, weight of size [256, 512, 1, 1], expected input[1, 3, 224, 224] to have 512 channels, but got 3 channels instead
2024-10-24 20:47:53,046 - INFO - 
Testing VGG16...
2024-10-24 20:48:30,367 - INFO - ✓ VGG16 passed (Time: 37.32s)
2024-10-24 20:48:30,383 - INFO - 
Testing VGG19...
2024-10-24 20:49:14,659 - INFO - ✓ VGG19 passed (Time: 44.28



2024-10-24 20:49:59,900 - ERROR - ✗ INCEPTION_V3 failed: Given groups=1, weight of size [192, 768, 1, 1], expected input[1, 3, 224, 224] to have 768 channels, but got 3 channels instead
2024-10-24 20:49:59,914 - INFO - 
Testing VIT_B_16...
2024-10-24 20:50:01,165 - ERROR - ✗ VIT_B_16 failed: Expected (batch_size, seq_length, hidden_dim) got torch.Size([2, 3, 224, 224])
2024-10-24 20:50:01,191 - INFO - 
Testing VIT_B_32...
2024-10-24 20:50:02,354 - ERROR - ✗ VIT_B_32 failed: Expected (batch_size, seq_length, hidden_dim) got torch.Size([2, 3, 224, 224])
2024-10-24 20:50:02,446 - INFO - 
Testing EFFICIENTNET_B0...
2024-10-24 20:50:47,847 - ERROR - ✗ EFFICIENTNET_B0 failed: Given groups=1, weight of size [672, 112, 1, 1], expected input[1, 1280, 7, 7] to have 112 channels, but got 1280 channels instead
2024-10-24 20:50:47,850 - INFO - 
Test Summary
2024-10-24 20:50:47,851 - INFO - Model                Status    
2024-10-24 20:50:47,851 - INFO - ---------------------------------------------