# Medical Image Anomaly Detection - A100 40GB Demo
## X-Ray & MRI Anomali Tespiti (Kƒ±rƒ±k & T√ºm√∂r)

Bu notebook, iki farklƒ± yakla≈üƒ±mla medikal g√∂r√ºnt√ºlerde anomali tespiti yapar:
1. **Convolutional Autoencoder** - Reconstruction Error
2. **Vision Transformer (ViT)** - Feature-based Detection

**Veri Setleri:**
- X-Ray: Kƒ±rƒ±k tespiti (Normal vs Fractured)
- MRI: Beyin t√ºm√∂r√º tespiti (Normal vs Tumor)


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision import models
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os
from pathlib import Path
from sklearn.metrics import roc_auc_score, classification_report, confusion_matrix
import seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# GPU kontrol√º
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')
if torch.cuda.is_available():
    print(f'GPU: {torch.cuda.get_device_name(0)}')
    print(f'VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB')


Device: cuda
GPU: NVIDIA A100-SXM4-40GB
VRAM: 42.47 GB


## 1. Veri Y√ºkleme ve Hazƒ±rlƒ±k


In [2]:
class AnomalyDataset(Dataset):
    """Normal g√∂r√ºnt√ºlerle eƒüitim, anomali g√∂r√ºnt√ºleriyle test i√ßin dataset"""
    def __init__(self, normal_path, anomaly_path=None, transform=None, mode='train'):
        self.transform = transform
        self.mode = mode
        self.images = []
        self.labels = []

        # Normal g√∂r√ºnt√ºleri y√ºkle
        normal_files = list(Path(normal_path).glob('*.jpg'))
        for img_path in normal_files:
            self.images.append(str(img_path))
            self.labels.append(0)  # 0 = Normal

        # Test modunda anomali g√∂r√ºnt√ºlerini de y√ºkle
        if anomaly_path and mode == 'test':
            if Path(anomaly_path).is_dir():
                # MRI i√ßin t√ºm alt klas√∂rleri tara
                for subdir in Path(anomaly_path).iterdir():
                    if subdir.is_dir() and subdir.name != 'notumor':
                        anomaly_files = list(subdir.glob('*.jpg'))
                        for img_path in anomaly_files:
                            self.images.append(str(img_path))
                            self.labels.append(1)  # 1 = Anomali
            else:
                # X-Ray i√ßin direkt klas√∂r
                anomaly_files = list(Path(anomaly_path).glob('*.jpg'))
                for img_path in anomaly_files:
                    self.images.append(str(img_path))
                    self.labels.append(1)

        print(f'{mode.upper()} - Normal: {len([l for l in self.labels if l==0])}, '
              f'Anomali: {len([l for l in self.labels if l==1])}')

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = Image.open(self.images[idx]).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, self.labels[idx]

# Transform tanƒ±mlama
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


In [4]:
# Hangi veri setiyle √ßalƒ±≈ümak istediƒüinizi se√ßin
DATASET = 'xray'  # 'xray' veya 'mri'

if DATASET == 'xray':
    # X-Ray: Normal = not_fractured, Anomaly = fractured
    NORMAL_TRAIN = r'C:\Users\eraye\OneDrive\Masa√ºst√º\x-ray\train\not_fractured'
    NORMAL_TEST = r'C:\Users\eraye\OneDrive\Masa√ºst√º\x-ray\test\not_fractured'
    ANOMALY_TEST = r'C:\Users\eraye\OneDrive\Masa√ºst√º\x-ray\test\fractured'
    print('üìä X-Ray Kƒ±rƒ±k Anomali Tespiti')
else:
    # MRI: Normal = notumor, Anomaly = t√ºm t√ºm√∂rler
    NORMAL_TRAIN = r'C:\Users\eraye\OneDrive\Masa√ºst√º\mri\train\notumor'
    NORMAL_TEST = r'C:\Users\eraye\OneDrive\Masa√ºst√º\mri\test\notumor'
    ANOMALY_TEST = r'C:\Users\eraye\OneDrive\Masa√ºst√º\mri\test'  # glioma, meningioma, pituitary
    print('üìä MRI T√ºm√∂r Anomali Tespiti')

# Dataset olu≈üturma
train_dataset = AnomalyDataset(NORMAL_TRAIN, transform=transform, mode='train')
test_dataset = AnomalyDataset(NORMAL_TEST, ANOMALY_TEST, transform=transform, mode='test')

# DataLoader
BATCH_SIZE = 32  # A100 i√ßin optimize edilmi≈ü
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True)


üìä X-Ray Kƒ±rƒ±k Anomali Tespiti
TRAIN - Normal: 0, Anomali: 0
TEST - Normal: 0, Anomali: 0


ValueError: num_samples should be a positive integer value, but got num_samples=0

## 2. Model 1: Convolutional Autoencoder
Reconstruction error kullanarak anomali tespiti


In [None]:
class ConvAutoencoder(nn.Module):
    def __init__(self, latent_dim=256):
        super(ConvAutoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, 4, stride=2, padding=1),  # 224 -> 112
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.2),

            nn.Conv2d(64, 128, 4, stride=2, padding=1),  # 112 -> 56
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2),

            nn.Conv2d(128, 256, 4, stride=2, padding=1),  # 56 -> 28
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2),

            nn.Conv2d(256, 512, 4, stride=2, padding=1),  # 28 -> 14
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2),

            nn.Conv2d(512, latent_dim, 4, stride=2, padding=1),  # 14 -> 7
            nn.BatchNorm2d(latent_dim),
            nn.LeakyReLU(0.2)
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(latent_dim, 512, 4, stride=2, padding=1),  # 7 -> 14
            nn.BatchNorm2d(512),
            nn.ReLU(),

            nn.ConvTranspose2d(512, 256, 4, stride=2, padding=1),  # 14 -> 28
            nn.BatchNorm2d(256),
            nn.ReLU(),

            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1),  # 28 -> 56
            nn.BatchNorm2d(128),
            nn.ReLU(),

            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),  # 56 -> 112
            nn.BatchNorm2d(64),
            nn.ReLU(),

            nn.ConvTranspose2d(64, 3, 4, stride=2, padding=1),  # 112 -> 224
            nn.Tanh()
        )

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed, latent

# Model olu≈ütur
autoencoder = ConvAutoencoder(latent_dim=256).to(device)
print(f'Autoencoder parametreleri: {sum(p.numel() for p in autoencoder.parameters()):,}')


In [None]:
# Autoencoder eƒüitimi
def train_autoencoder(model, train_loader, epochs=10, lr=0.001):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

    model.train()
    losses = []

    for epoch in range(epochs):
        epoch_loss = 0
        pbar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs}')

        for images, _ in pbar:
            images = images.to(device)

            optimizer.zero_grad()
            reconstructed, _ = model(images)
            loss = criterion(reconstructed, images)

            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
            pbar.set_postfix({'loss': loss.item()})

        avg_loss = epoch_loss / len(train_loader)
        losses.append(avg_loss)
        scheduler.step()
        print(f'Epoch {epoch+1}: Loss = {avg_loss:.6f}')

    return losses

# Eƒüitim
print('üöÄ Autoencoder eƒüitimi ba≈ülƒ±yor...')
ae_losses = train_autoencoder(autoencoder, train_loader, epochs=15, lr=0.0005)


In [None]:
# Reconstruction error ile anomali tespiti
def compute_reconstruction_errors(model, dataloader):
    model.eval()
    errors = []
    labels = []

    with torch.no_grad():
        for images, lbls in tqdm(dataloader, desc='Testing Autoencoder'):
            images = images.to(device)
            reconstructed, _ = model(images)

            # Per-image reconstruction error
            mse = ((images - reconstructed) ** 2).view(images.size(0), -1).mean(dim=1)
            errors.extend(mse.cpu().numpy())
            labels.extend(lbls.numpy())

    return np.array(errors), np.array(labels)

ae_errors, true_labels = compute_reconstruction_errors(autoencoder, test_loader)

# ROC-AUC skoru
ae_auc = roc_auc_score(true_labels, ae_errors)
print(f'\n‚úÖ Autoencoder ROC-AUC: {ae_auc:.4f}')

# Threshold belirleme (95th percentile of normal errors)
normal_errors = ae_errors[true_labels == 0]
ae_threshold = np.percentile(normal_errors, 95)
ae_predictions = (ae_errors > ae_threshold).astype(int)

print('\nAutoencoder Classification Report:')
print(classification_report(true_labels, ae_predictions, target_names=['Normal', 'Anomaly']))


## 3. Model 2: EfficientNet + One-Class SVM
Pre-trained model ile feature extraction + anomaly detection


In [None]:
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler

# Pre-trained Feature Extractor
class FeatureExtractor(nn.Module):
    def __init__(self):
        super(FeatureExtractor, self).__init__()
        # EfficientNetV2 (hƒ±zlƒ± ve g√º√ßl√º)
        self.model = models.efficientnet_v2_m(weights='DEFAULT')
        # Son katmanƒ± √ßƒ±kar
        self.features = nn.Sequential(*list(self.model.children())[:-1])

    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        return x

feature_extractor = FeatureExtractor().to(device)
feature_extractor.eval()
print(f'Feature Extractor y√ºklendi')


In [None]:
# Feature extraction
def extract_features(model, dataloader):
    features = []
    labels = []

    model.eval()
    with torch.no_grad():
        for images, lbls in tqdm(dataloader, desc='Extracting Features'):
            images = images.to(device)
            feats = model(images)
            features.append(feats.cpu().numpy())
            labels.extend(lbls.numpy())

    return np.vstack(features), np.array(labels)

print('üöÄ Feature extraction ba≈ülƒ±yor...')
train_features, _ = extract_features(feature_extractor, train_loader)
test_features, test_labels = extract_features(feature_extractor, test_loader)

# Feature normalization
scaler = StandardScaler()
train_features = scaler.fit_transform(train_features)
test_features = scaler.transform(test_features)

print(f'Train features shape: {train_features.shape}')
print(f'Test features shape: {test_features.shape}')


In [None]:
# One-Class SVM eƒüitimi (sadece normal √∂rneklerle)
print('üöÄ One-Class SVM eƒüitimi...')
ocsvm = OneClassSVM(kernel='rbf', gamma='auto', nu=0.05)
ocsvm.fit(train_features)

# Test
svm_predictions = ocsvm.predict(test_features)
svm_scores = ocsvm.decision_function(test_features)

# SVM output: 1 (normal) ve -1 (anomaly) -> 0 ve 1'e √ßevir
svm_predictions = (svm_predictions == -1).astype(int)
svm_scores = -svm_scores  # Negatif skorlar anomali olduƒüu i√ßin tersine √ßevir

svm_auc = roc_auc_score(test_labels, svm_scores)
print(f'\n‚úÖ EfficientNet + One-Class SVM ROC-AUC: {svm_auc:.4f}')

print('\nEfficientNet + OCSVM Classification Report:')
print(classification_report(test_labels, svm_predictions, target_names=['Normal', 'Anomaly']))


## 4. Sonu√ßlarƒ±n G√∂rselle≈ütirilmesi


In [None]:
fig, axes = plt.subplots(2, 3, figsize=(18, 12))

# 1. Autoencoder Loss
axes[0, 0].plot(ae_losses, linewidth=2, color='#3498db')
axes[0, 0].set_title('Autoencoder Training Loss', fontsize=14, fontweight='bold')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('Loss')
axes[0, 0].grid(True, alpha=0.3)

# 2. Reconstruction Error Distribution
normal_errors = ae_errors[true_labels == 0]
anomaly_errors = ae_errors[true_labels == 1]
axes[0, 1].hist(normal_errors, bins=50, alpha=0.7, label='Normal', color='green')
axes[0, 1].hist(anomaly_errors, bins=50, alpha=0.7, label='Anomaly', color='red')
axes[0, 1].axvline(ae_threshold, color='black', linestyle='--', linewidth=2, label='Threshold')
axes[0, 1].set_title('Autoencoder: Reconstruction Error Distribution', fontsize=14, fontweight='bold')
axes[0, 1].set_xlabel('Reconstruction Error')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# 3. Autoencoder Confusion Matrix
cm_ae = confusion_matrix(true_labels, ae_predictions)
sns.heatmap(cm_ae, annot=True, fmt='d', cmap='Blues', ax=axes[0, 2])
axes[0, 2].set_title('Autoencoder: Confusion Matrix', fontsize=14, fontweight='bold')
axes[0, 2].set_xlabel('Predicted')
axes[0, 2].set_ylabel('True')
axes[0, 2].set_xticklabels(['Normal', 'Anomaly'])
axes[0, 2].set_yticklabels(['Normal', 'Anomaly'])

# 4. SVM Score Distribution
normal_scores = svm_scores[test_labels == 0]
anomaly_scores = svm_scores[test_labels == 1]
axes[1, 0].hist(normal_scores, bins=50, alpha=0.7, label='Normal', color='green')
axes[1, 0].hist(anomaly_scores, bins=50, alpha=0.7, label='Anomaly', color='red')
axes[1, 0].set_title('EfficientNet: Anomaly Score Distribution', fontsize=14, fontweight='bold')
axes[1, 0].set_xlabel('Anomaly Score')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# 5. SVM Confusion Matrix
cm_svm = confusion_matrix(test_labels, svm_predictions)
sns.heatmap(cm_svm, annot=True, fmt='d', cmap='Greens', ax=axes[1, 1])
axes[1, 1].set_title('EfficientNet: Confusion Matrix', fontsize=14, fontweight='bold')
axes[1, 1].set_xlabel('Predicted')
axes[1, 1].set_ylabel('True')
axes[1, 1].set_xticklabels(['Normal', 'Anomaly'])
axes[1, 1].set_yticklabels(['Normal', 'Anomaly'])

# 6. Model Comparison
models_comparison = ['Autoencoder', 'EfficientNet\n+ OCSVM']
auc_scores = [ae_auc, svm_auc]
colors = ['#3498db', '#2ecc71']
bars = axes[1, 2].bar(models_comparison, auc_scores, color=colors, alpha=0.8)
axes[1, 2].set_ylim([0, 1.0])
axes[1, 2].set_title('Model Comparison (ROC-AUC)', fontsize=14, fontweight='bold')
axes[1, 2].set_ylabel('ROC-AUC Score')
axes[1, 2].grid(True, alpha=0.3, axis='y')
for i, v in enumerate(auc_scores):
    axes[1, 2].text(i, v + 0.02, f'{v:.4f}', ha='center', fontweight='bold', fontsize=12)

plt.tight_layout()
plt.savefig('anomaly_detection_results.png', dpi=150, bbox_inches='tight')
plt.show()

print('\nüìä Sonu√ßlar "anomaly_detection_results.png" olarak kaydedildi')


## 5. √ñrnek G√∂r√ºnt√ºlerde Reconstruction


In [None]:
# √ñrnek g√∂r√ºnt√ºlerle reconstruction g√∂rselle≈ütirme
autoencoder.eval()
sample_indices = np.random.choice(len(test_dataset), 8, replace=False)

fig, axes = plt.subplots(4, 4, figsize=(16, 16))

with torch.no_grad():
    for i, idx in enumerate(sample_indices):
        img, label = test_dataset[idx]
        img_batch = img.unsqueeze(0).to(device)
        reconstructed, _ = autoencoder(img_batch)

        # Denormalize
        mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
        std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
        img_denorm = img * std + mean
        rec_denorm = reconstructed.cpu().squeeze(0) * std + mean

        # Original
        row = i // 2 * 2
        col = (i % 2) * 2
        axes[row, col].imshow(img_denorm.permute(1, 2, 0).clamp(0, 1))
        axes[row, col].set_title(f'Original - {"NORMAL" if label == 0 else "ANOMALY"}',
                                 fontweight='bold', color='green' if label == 0 else 'red', fontsize=12)
        axes[row, col].axis('off')

        # Reconstructed
        axes[row, col+1].imshow(rec_denorm.permute(1, 2, 0).clamp(0, 1))
        error = ((img - reconstructed.cpu().squeeze(0)) ** 2).mean().item()
        axes[row, col+1].set_title(f'Reconstructed - Error: {error:.4f}', fontweight='bold', fontsize=12)
        axes[row, col+1].axis('off')

plt.tight_layout()
plt.savefig('reconstruction_examples.png', dpi=150, bbox_inches='tight')
plt.show()

print('\nüì∏ Reconstruction √∂rnekleri "reconstruction_examples.png" olarak kaydedildi')


## 6. Sonu√ß √ñzeti


In [None]:
print('\n' + '='*70)
print('                    ANOMALI TESPƒ∞T SONU√áLARI')
print('='*70)
print(f'\nüìä Veri Seti: {"X-Ray (Kƒ±rƒ±k Tespiti)" if DATASET == "xray" else "MRI (T√ºm√∂r Tespiti)"}')
print(f'üñ•Ô∏è  GPU: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU"}')
print(f'\nüîµ AUTOENCODER (Reconstruction-based):')
print(f'   ‚îú‚îÄ ROC-AUC Score: {ae_auc:.4f}')
print(f'   ‚îú‚îÄ Threshold: {ae_threshold:.6f}')
print(f'   ‚îî‚îÄ Accuracy: {(ae_predictions == true_labels).sum() / len(true_labels):.4f}')
print(f'\nüü¢ EfficientNet + ONE-CLASS SVM (Feature-based):')
print(f'   ‚îú‚îÄ ROC-AUC Score: {svm_auc:.4f}')
print(f'   ‚îî‚îÄ Accuracy: {(svm_predictions == test_labels).sum() / len(test_labels):.4f}')
print(f'\nüèÜ En ƒ∞yi Model: {"Autoencoder" if ae_auc > svm_auc else "EfficientNet + OCSVM"}')
print('\n' + '='*70)
print('\nüí° √ñneriler:')
print('   ‚Ä¢ Daha fazla epoch i√ßin eƒüitim s√ºresi artƒ±rƒ±labilir')
print('   ‚Ä¢ Ensemble (her iki modelin kombinasyonu) denenebilir')
print('   ‚Ä¢ Data augmentation ile veri √ße≈üitliliƒüi artƒ±rƒ±labilir')
print('   ‚Ä¢ Farklƒ± threshold deƒüerleri optimize edilebilir')
print('   ‚Ä¢ Diƒüer veri seti i√ßin DATASET deƒüi≈ükenini deƒüi≈ütirin')
print('='*70)


## 7. Model Kaydetme (Opsiyonel)


In [None]:
# Modelleri kaydet
torch.save({
    'autoencoder_state': autoencoder.state_dict(),
    'threshold': ae_threshold,
    'scaler': scaler,
    'ocsvm': ocsvm,
    'dataset': DATASET
}, 'anomaly_detection_models.pth')

print('‚úÖ Modeller "anomaly_detection_models.pth" olarak kaydedildi!')
print('\nüìù Model y√ºkleme i√ßin:')
print('checkpoint = torch.load("anomaly_detection_models.pth")')
print('autoencoder.load_state_dict(checkpoint["autoencoder_state"])')
print('scaler = checkpoint["scaler"]')
print('ocsvm = checkpoint["ocsvm"]')
