In [None]:
from urllib.request import urlopen
import timm
import torch
import zipfile,os
from PIL import Image
from pathlib import Path
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torch.utils.data import DataLoader,Dataset
from sklearn.metrics import accuracy_score
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim

In [None]:
# apakan sama path di pc bay

train_dir = "/kaggle/input/lung-disease/train/train"
val_dir = "/kaggle/input/lung-disease/val/val"
test_dir = "/kaggle/input/lung-disease/test/test"

In [None]:
model_name = 'maxvit_rmlp_base_rw_224.sw_in12k_ft_in1k'

In [None]:
model = timm.create_model(
    model_name,
    pretrained=True,
    num_classes=5,
)
if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)

model = model.train()

# Get model specific transforms (normalization, resize)
data_config = timm.data.resolve_model_data_config(model)
print(f"Model expects input size: {data_config['input_size']}")
print(f"Model normalization: mean={data_config['mean']}, std={data_config['std']}")

# Create training and validation transforms
train_transform = timm.data.create_transform(**data_config, is_training=True)
val_transform = timm.data.create_transform(**data_config, is_training=False)

In [None]:
class CustomDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.images = []
        self.labels = []
        
        # Get class names and sort them for consistency
        self.class_names = sorted(os.listdir(data_dir))
        print(f"Found classes: {self.class_names}")

        for label, class_name in enumerate(self.class_names):
            class_dir = os.path.join(data_dir, class_name)
            if os.path.isdir(class_dir):  # Only process directories
                class_images = [f for f in os.listdir(class_dir) 
                              if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff'))]
                print(f"Class '{class_name}': {len(class_images)} images")
                
                for img_name in class_images:
                    img_path = os.path.join(class_dir, img_name)
                    self.images.append(img_path)
                    self.labels.append(label)
        
        print(f"Total images loaded: {len(self.images)}")

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        try:
            image = Image.open(img_path).convert('RGB')
            label = self.labels[idx]

            if self.transform:
                image = self.transform(image)

            return image, label
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            # Return a black image if loading fails
            if self.transform:
                black_image = self.transform(Image.new('RGB', (224, 224), (0, 0, 0)))
            else:
                black_image = Image.new('RGB', (224, 224), (0, 0, 0))
            return black_image, self.labels[idx]


In [None]:
# Quick data diagnostic
import matplotlib.pyplot as plt
import numpy as np

# Create datasets with appropriate transforms
train_dataset = CustomDataset(data_dir=train_dir, transform=train_transform)
val_dataset = CustomDataset(data_dir=val_dir, transform=val_transform)

# Create data loaders with NO workers for Kaggle compatibility
train_loader = DataLoader(
    train_dataset, 
    batch_size=32,
    shuffle=True, 
    num_workers=0,  # Set to 0 for Kaggle/Colab compatibility
    pin_memory=False,  # Disable pin_memory for stability
    drop_last=True
)

val_loader = DataLoader(
    val_dataset, 
    batch_size=64,
    shuffle=False, 
    num_workers=0,  # Set to 0 for Kaggle/Colab compatibility
    pin_memory=False  # Disable pin_memory for stability
)

print("=== Data Loading Test ===")
print(f"Training batches: {len(train_loader)}")
print(f"Validation batches: {len(val_loader)}")

# Test data loading and show sample
try:
    for data in train_loader:
        inputs, targets = data
        print(f"✅ Training batch shape: {inputs.shape}")
        print(f"✅ Training labels shape: {targets.shape}")
        print(f"✅ Input range: [{inputs.min().item():.3f}, {inputs.max().item():.3f}]")
        print(f"✅ Label range: [{targets.min().item()}, {targets.max().item()}]")
        print(f"✅ Unique labels in batch: {torch.unique(targets).tolist()}")
        
        # Show a few sample images
        fig, axes = plt.subplots(1, 4, figsize=(12, 3))
        for i in range(min(4, inputs.shape[0])):
            img = inputs[i]
            # Denormalize for display
            mean = torch.tensor(data_config['mean']).view(3, 1, 1)
            std = torch.tensor(data_config['std']).view(3, 1, 1)
            img = img * std + mean
            img = torch.clamp(img, 0, 1)
            
            axes[i].imshow(img.permute(1, 2, 0))
            axes[i].set_title(f"Label: {targets[i].item()}\nClass: {train_dataset.class_names[targets[i].item()]}")
            axes[i].axis('off')
        plt.tight_layout()
        plt.show()
        break
        
except Exception as e:
    print(f"❌ Error in data loading: {e}")
    print("This might be the source of your training issues!")

print("\n=== Data Loaders Ready ===")
print("✅ Your dataset has 5 lung disease classes!")
print("✅ Model updated to 5 classes - now everything should work!")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from sklearn.metrics import precision_recall_fscore_support
import matplotlib.pyplot as plt

# Check data distribution first
print("=== Dataset Information ===")
print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Training classes: {train_dataset.class_names}")
print(f"Validation classes: {val_dataset.class_names}")

# Count class distribution
train_class_counts = {}
for label in train_dataset.labels:
    class_name = train_dataset.class_names[label]
    train_class_counts[class_name] = train_class_counts.get(class_name, 0) + 1
print(f"Training class distribution: {train_class_counts}")

val_class_counts = {}
for label in val_dataset.labels:
    class_name = val_dataset.class_names[label]
    val_class_counts[class_name] = val_class_counts.get(class_name, 0) + 1
print(f"Validation class distribution: {val_class_counts}")

# Setup device and model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Move model to device and ensure it's in training mode
model.to(device)
model.train()

# Define loss function with class weights for imbalanced data
total_train = len(train_dataset)
class_weights = []
for class_name in train_dataset.class_names:
    class_count = train_class_counts.get(class_name, 1)
    weight = total_train / (len(train_dataset.class_names) * class_count)
    class_weights.append(weight)

class_weights = torch.FloatTensor(class_weights).to(device)
print(f"Class weights: {class_weights}")

criterion = nn.CrossEntropyLoss(weight=class_weights)

# Optimizer with lower learning rate
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)

# Learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=3, verbose=True
)

# Training parameters
num_epochs = 20
best_val_loss = float('inf')
save_path = "./best_model.pth"

# Lists to store training history
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

print("\n=== Starting Training ===")

for epoch in range(num_epochs):
    # ====================================================================
    #                            TRAINING LOOP
    # ====================================================================
    model.train()
    epoch_loss = 0
    correct_predictions = 0
    total_samples = 0
    all_targets = []
    all_preds = []
    
    progress_bar = tqdm(train_loader, desc=f"Epoch [{epoch+1}/{num_epochs}] Training")
    
    for batch_idx, (inputs, targets) in enumerate(progress_bar):
        inputs, targets = inputs.to(device), targets.to(device)
        
        # Zero gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        
        # Backward pass
        loss.backward()
        
        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # Update weights
        optimizer.step()
        
        # Statistics
        _, predicted = torch.max(outputs.data, 1)
        total_samples += targets.size(0)
        correct_predictions += (predicted == targets).sum().item()
        
        all_preds.extend(predicted.cpu().numpy())
        all_targets.extend(targets.cpu().numpy())
        
        epoch_loss += loss.item()
        
        # Update progress bar
        current_acc = 100.0 * correct_predictions / total_samples
        progress_bar.set_postfix({
            'loss': f'{loss.item():.4f}',
            'acc': f'{current_acc:.1f}%'
        })
    
    # Calculate training metrics
    avg_train_loss = epoch_loss / len(train_loader)
    train_accuracy = 100.0 * correct_predictions / total_samples
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_targets, all_preds, average='weighted', zero_division=0
    )
    
    train_losses.append(avg_train_loss)
    train_accuracies.append(train_accuracy)
    
    print(f"Epoch [{epoch+1}/{num_epochs}] Training:")
    print(f"  Loss: {avg_train_loss:.4f}, Accuracy: {train_accuracy:.2f}%, F1: {f1:.4f}")
    
    # ====================================================================
    #                           VALIDATION LOOP
    # ====================================================================
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    all_val_targets = []
    all_val_preds = []
    
    with torch.no_grad():
        val_progress_bar = tqdm(val_loader, desc=f"Epoch [{epoch+1}/{num_epochs}] Validation")
        for inputs, targets in val_progress_bar:
            inputs, targets = inputs.to(device), targets.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += targets.size(0)
            val_correct += (predicted == targets).sum().item()
            
            all_val_preds.extend(predicted.cpu().numpy())
            all_val_targets.extend(targets.cpu().numpy())
            
            # Update progress bar
            current_val_acc = 100.0 * val_correct / val_total
            val_progress_bar.set_postfix({
                'loss': f'{loss.item():.4f}',
                'acc': f'{current_val_acc:.1f}%'
            })
    
    # Calculate validation metrics
    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = 100.0 * val_correct / val_total
    val_precision, val_recall, val_f1, _ = precision_recall_fscore_support(
        all_val_targets, all_val_preds, average='weighted', zero_division=0
    )
    
    val_losses.append(avg_val_loss)
    val_accuracies.append(val_accuracy)
    
    print(f"Epoch [{epoch+1}/{num_epochs}] Validation:")
    print(f"  Loss: {avg_val_loss:.4f}, Accuracy: {val_accuracy:.2f}%, F1: {val_f1:.4f}")
    
    # Save best model
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), save_path)
        print(f"  ✅ Best model saved! Val Loss: {best_val_loss:.4f}")
    
    # Learning rate scheduling
    scheduler.step(avg_val_loss)
    current_lr = optimizer.param_groups[0]['lr']
    print(f"  Learning Rate: {current_lr:.2e}")
    print("-" * 60)
    
    # Early stopping if learning rate gets too small
    if current_lr < 1e-7:
        print("Learning rate too small, stopping training.")
        break

# Save final model
torch.save(model.state_dict(), "./last_model.pth")

# Plot training history
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss', marker='o')
plt.plot(val_losses, label='Validation Loss', marker='s')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Training Accuracy', marker='o')
plt.plot(val_accuracies, label='Validation Accuracy', marker='s')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.savefig('training_history.png', dpi=150, bbox_inches='tight')
plt.show()

print(f"\n=== Training Complete ===")
print(f"Best validation loss: {best_val_loss:.4f}")
print(f"Final training accuracy: {train_accuracies[-1]:.2f}%")
print(f"Final validation accuracy: {val_accuracies[-1]:.2f}%")

In [None]:
import csv
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
import timm

# ==============================
# CUSTOM DATASET WITH PATHS
# ==============================
class ImageFolderWithPaths(ImageFolder):
    """Custom dataset that includes image file paths."""
    def __getitem__(self, index):
        # Normal ImageFolder return (img, label)
        original_tuple = super().__getitem__(index)
        path = self.samples[index][0]  # file path
        return original_tuple + (path,)  # (img, label, path)

# ==============================
# DATASET & LOADER
# ==============================
transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

test_dataset = ImageFolderWithPaths(root=test_dir, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# ==============================
# DEVICE
# ==============================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ==============================
# MODEL
# ==============================
model = timm.create_model(
    model_name,
    pretrained=True,
    num_classes=5,
)

if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)

model.to(device)

# ==============================
# LOAD BEST MODEL
# ==============================
state_dict = torch.load("./best_model.pth", map_location=device)
if isinstance(model, nn.DataParallel):
    model.module.load_state_dict(state_dict)
else:
    model.load_state_dict(state_dict)

model.eval()
print("✅ Loaded best_model.pth successfully.")

# ==============================
# PREDICTION LOOP
# ==============================
results = []

with torch.no_grad():
    for inputs, _, paths in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, dim=1)

        for path, pred in zip(paths, preds.cpu().numpy()):
            filename = os.path.basename(path)

            # ✅ Clean up Roboflow suffix like ".rf.xxxxx"
            if ".rf." in filename:
                filename = filename.split(".rf.")[0]

            # ✅ Replace "_jpeg" suffix with ".jpeg"
            if filename.endswith("_jpeg"):
                filename = filename.replace("_jpeg", ".jpeg")

            results.append([filename, pred])

# ==============================
# SAVE CSV
# ==============================
csv_file = "test_predictions.csv"
with open(csv_file, mode="w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["Id", "Predicted"])  # header
    writer.writerows(results)

print(f"✅ Predictions saved to {csv_file}")
