In [1]:
import numpy as np
import pandas as pd
import os 
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import matplotlib.pyplot as plt 

from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torch.utils.data.dataloader import DataLoader 
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import transforms
import os
from PIL import Image
import numpy as np
import time


In [None]:
class CancerCNN(nn.Module):

    def __init__(self, num_classes=5):
        super().__init__()

        # Feature extractor
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)   # 128x128 -> 128x128
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)  # 64x64 -> 64x64
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)  # 32x32 -> 32x32

        self.pool = nn.MaxPool2d(2, 2)  # halves H,W

        # After 3 pools: 128 -> 64 -> 32 -> 16
        # Channels = 64, spatial = 16x16
        self.fc1 = nn.Linear(64 * 16 * 16, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # -> 16 x 64 x 64
        x = self.pool(F.relu(self.conv2(x)))  # -> 32 x 32 x 32
        x = self.pool(F.relu(self.conv3(x)))  # -> 64 x 16 x 16

        x = torch.flatten(x, 1)               # -> 64*16*16
        x = F.relu(self.fc1(x))               # -> 128
        x = self.fc2(x)                       # -> logits (5)
        return x
# 2. DATASET CLASS

class CancerDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = ['colon_aca', 'colon_n', 'lung_aca', 'lung_n', 'lung_scc']
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
        self.samples = []
        
        print(f"Root directory: {root_dir}")
        
        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            if os.path.exists(class_dir):
                images = [f for f in os.listdir(class_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.tif', '.tiff'))]
                for img_name in images[:5000]:  # Used to check if loading works on smaller samples
                    img_path = os.path.join(class_dir, img_name)
                    self.samples.append((img_path, self.class_to_idx[class_name]))
                print(f"  {class_name}: {len(images)} images")
        
        print(f"Total samples loaded: {len(self.samples)}")
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        return image, label


# 3. TRAINING FUNCTION 
def train_with_immediate_output():
      
    print("Traisn MODEL")
     
    
    # Setup
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"\n[SETUP] Device: {device}")
    if device.type == 'cuda':
        print(f"[SETUP] GPU: {torch.cuda.get_device_name(0)}")
        print(f"[SETUP] CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
    
    # Define transforms
    transform = transforms.Compose([
        transforms.Resize((128, 128)),  # Fixed size
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])
    
    # Load dataset
    print("\n[DATA] Loading dataset...")
    data_dir = "lungcolon"
    
    if not os.path.exists(data_dir):
        print(f"ERROR: Dataset not found at '{data_dir}'")

        return
    
    # Create dataset
    dataset = CancerDataset(root_dir=data_dir, transform=transform)
    
    # Split dataset (70% train, 15% val, 15% test)
    train_size = int(0.7 * len(dataset))
    val_size = int(0.15 * len(dataset))
    test_size = len(dataset) - train_size - val_size
    
    train_dataset, val_dataset, test_dataset = random_split(
        dataset, [train_size, val_size, test_size],
        generator=torch.Generator().manual_seed(42)
    )
    
    print(f"\n[DATA] Dataset splits:")
    print(f"  Training: {len(train_dataset)} samples")
    print(f"  Validation: {len(val_dataset)} samples")
    print(f"  Test: {len(test_dataset)} samples")
    print(f"  Classes: {dataset.classes}")
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0)  # num_workers=0 for immediate output
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=0)
    
    # Create model
    model = CancerCNN(num_classes=len(dataset.classes)).to(device)
    
      
    print("MODEL ARCHITECTURE")
     
    print(model)
    
    # Calculate parameters
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"\n[MODEL] Total parameters: {total_params:,}")
    print(f"[MODEL] Trainable parameters: {trainable_params:,}")
    
    # Training setup
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
      
    print("TRAINING")
     
    
    
    # Training loop
    num_epochs = 5  # Start with 3 epochs for quick results IF CPU it will take hors 
    model.train()
    
    for epoch in range(num_epochs):
        epoch_start_time = time.time()
        running_loss = 0.0
        correct = 0
        total = 0
        
        print(f"\n[EPOCH {epoch+1}/{num_epochs}] Starting...")
    
        
        # Training batches
        for batch_idx, (images, labels) in enumerate(train_loader):
            batch_start_time = time.time()
            
            # Move to device
            images, labels = images.to(device), labels.to(device)
            
            # Forward pass
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            # Backward pass
            loss.backward()
            optimizer.step()
            
            # Calculate accuracy
            _, predicted = torch.max(outputs.data, 1)
            batch_total = labels.size(0)
            batch_correct = (predicted == labels).sum().item()
            
            # Update statistics
            running_loss += loss.item()
            total += batch_total
            correct += batch_correct
            
            # Print immediate progress
            batch_time = time.time() - batch_start_time
            
            if batch_idx == 0:
                print(f"[BATCH 1] Loss: {loss.item():.4f}, Acc: {100*batch_correct/batch_total:.1f}%, Time: {batch_time:.1f}s")
                print(f"[INFO] First batch completed! Training continues...")
            elif (batch_idx + 1) % 10 == 0:  # Print every 10 batches
                avg_loss = running_loss / (batch_idx + 1)
                current_acc = 100 * correct / total
                print(f"[BATCH {batch_idx+1}] Avg Loss: {avg_loss:.4f}, Current Acc: {current_acc:.1f}%")
        
        # Epoch statistics
        epoch_time = time.time() - epoch_start_time
        epoch_loss = running_loss / len(train_loader)
        epoch_acc = 100 * correct / total
        
        print(f"\n[EPOCH {epoch+1} SUMMARY]")
        print(f"  Loss: {epoch_loss:.4f}")
        print(f"  Accuracy: {epoch_acc:.2f}%")
        print(f"  Time: {epoch_time:.1f} seconds")
        
        # Validation
        model.eval()
        val_correct = 0
        val_total = 0
        val_loss = 0.0
        
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()
        
        val_acc = 100 * val_correct / val_total
        avg_val_loss = val_loss / len(val_loader)
        
        print(f"[VALIDATION] Loss: {avg_val_loss:.4f}, Accuracy: {val_acc:.2f}%")
        
        
        model.train()  # Set back to training mode
    
    # Save model
    torch.save({
        'model_state_dict': model.state_dict(),
        'epochs': num_epochs,
        'loss': epoch_loss,
        'accuracy': epoch_acc,
        'classes': dataset.classes
    }, 'cancer_cnn_model.pth')
    
    print(f"\n" + "="*70)
    print("TRAINING COMPLETE!")
     
    print(f"Model saved as 'cancer_cnn_model.pth'")
    print(f"Final Training Accuracy: {epoch_acc:.2f}%")
    print(f"Final Validation Accuracy: {val_acc:.2f}%")
    print(f"Total training time: {time.time() - start_time:.1f} seconds")
    
    return model


# 5. MAIN 
if __name__ == "__main__":
    start_time = time.time()
    
      
    print("CANCER HISTOPATHOLOGY CNN CLASSIFIER")
     
    
    # Step 1: Quick test
    print("\n Running quick CNN connection test...")
    #quick_test()
    
    # Step 2: Training
    print("\n Starting actual training...")
    
    try:
        model = train_with_immediate_output()
        
          
        
    except Exception as e:
       print (f"ERROR")

CANCER HISTOPATHOLOGY CNN CLASSIFIER

 Running quick CNN connection test...

 Starting actual training...
Traisn MODEL

[SETUP] Device: cuda
[SETUP] GPU: NVIDIA GeForce RTX 3080 Laptop GPU
[SETUP] CUDA Memory: 8.6 GB

[DATA] Loading dataset...
Root directory: lungcolon
  colon_aca: 5000 images
  colon_n: 5000 images
  lung_aca: 5000 images
  lung_n: 5000 images
  lung_scc: 5000 images
Total samples loaded: 25000

[DATA] Dataset splits:
  Training: 17500 samples
  Validation: 3750 samples
  Test: 3750 samples
  Classes: ['colon_aca', 'colon_n', 'lung_aca', 'lung_n', 'lung_scc']
MODEL ARCHITECTURE
CancerCNN(
  (conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=16384, out_features=128, bias=True)
  (fc2)