<a href="https://colab.research.google.com/github/Rajat-Kumar-Pandey/MACHINE-LEARNING/blob/main/SD2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
import os
from PIL import Image
from torchvision import transforms
import torch
import numpy as np
from scipy.ndimage import gaussian_filter, map_coordinates

# Define transformations
transform = transforms.Compose([
    # Convert PIL Image to Tensor
    transforms.ToTensor(),

    # Geometric Transformations
    transforms.RandomAffine(degrees=25, translate=(0.1, 0.1), scale=(0.8, 1.2), shear=10),

    # Scaling and Zooming
    transforms.Resize((150, 150)),

    # Color and Lighting Variations
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),

    # Noise Addition (Using Gaussian Noise)
    transforms.Lambda(lambda x: x + torch.randn_like(x) * 0.1),  # Add Gaussian Noise

    # Cutout/Random Erasing
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.2), ratio=(0.3, 3.3)),
])

# Elastic deformation function (unchanged)
def elastic_transform(image_np, alpha, sigma):
    """
    Apply elastic deformation to an image using separate processing for each channel.
    """
    random_state = np.random.RandomState(None)
    shape = image_np.shape
    transformed_image = np.zeros_like(image_np)

    for channel in range(shape[2]):  # Process each color channel independently
        dx = gaussian_filter((random_state.rand(*shape[:2]) * 2 - 1), sigma, mode="constant", cval=0)
        dy = gaussian_filter((random_state.rand(*shape[:2]) * 2 - 1), sigma, mode="constant", cval=0)
        x, y = np.meshgrid(np.arange(shape[1]), np.arange(shape[0]))
        indices = np.reshape(y + dy * alpha, (-1, 1)), np.reshape(x + dx * alpha, (-1, 1))
        transformed_channel = map_coordinates(image_np[:, :, channel], indices, order=1, mode="reflect").reshape(shape[:2])
        transformed_image[:, :, channel] = transformed_channel

    return transformed_image

# Define dataset paths
base_dir = "/content/drive/MyDrive/datasets (1)"
categories = ["openeyes", "closeeyes"]

# Define output directory for transformed images
output_dir = "/content/drive/MyDrive/datasets_transformed"
os.makedirs(output_dir, exist_ok=True)

# Process each category folder
for category in categories:
    category_dir = os.path.join(base_dir, category)
    output_category_dir = os.path.join(output_dir, category)
    os.makedirs(output_category_dir, exist_ok=True)  # Create output subfolder for each category

    for file_name in os.listdir(category_dir):
        if file_name.endswith(('.jpg', '.png', '.jpeg')):
            image_path = os.path.join(category_dir, file_name)

            # Load and preprocess the image
            image = Image.open(image_path).convert("RGB")
            transformed_tensor = transform(image)  # Apply transformations

            # Convert tensor to NumPy array for elastic transformation
            transformed_image_np = transformed_tensor.permute(1, 2, 0).numpy()  # Rearrange to HWC format

            # Apply elastic deformation
            transformed_image_np = elastic_transform(transformed_image_np, alpha=5, sigma=3)

            # Convert back to PIL Image
            transformed_image = Image.fromarray((transformed_image_np * 255).astype(np.uint8))

            # Save the transformed image
            output_path = os.path.join(output_category_dir, f"transformed_{file_name}")
            transformed_image.save(output_path)

print("All transformations applied and saved in respective folders.")

All transformations applied and saved in respective folders.


**SPLITTING THE DATASETS**

In [7]:
import os
import shutil
import random

# Define paths
base_dir = "/content/drive/MyDrive/datasets_transformed"
categories = ["openeyes", "closeeyes"]
train_dir = os.path.join(base_dir, "train")
val_dir = os.path.join(base_dir, "val")
test_dir = os.path.join(base_dir, "test")

# Split ratios
train_ratio = 0.7
val_ratio = 0.2
test_ratio = 0.1

# Create directories if not exist
for split in [train_dir, val_dir, test_dir]:
    for category in categories:
        os.makedirs(os.path.join(split, category), exist_ok=True)

# Function to split dataset
def split_data(category_path, train_dest, val_dest, test_dest):
    files = os.listdir(category_path)
    random.shuffle(files)
    total_files = len(files)

    # Calculate split indices
    train_end = int(total_files * train_ratio)
    val_end = train_end + int(total_files * val_ratio)

    train_files = files[:train_end]
    val_files = files[train_end:val_end]
    test_files = files[val_end:]

    # Move files to respective directories
    for file in train_files:
        shutil.move(os.path.join(category_path, file), train_dest)
    for file in val_files:
        shutil.move(os.path.join(category_path, file), val_dest)
    for file in test_files:
        shutil.move(os.path.join(category_path, file), test_dest)

# Iterate over categories and split data
for category in categories:
    category_path = os.path.join(base_dir, category)
    train_dest = os.path.join(train_dir, category)
    val_dest = os.path.join(val_dir, category)
    test_dest = os.path.join(test_dir, category)

    split_data(category_path, train_dest, val_dest, test_dest)

print("Dataset successfully split into train, val, and test sets!")

Dataset successfully split into train, val, and test sets!


** Data Loading**

In [6]:
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import os

# Define Dataset class
class DrowsinessDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.img_paths = []
        self.labels = []
        for label, category in enumerate(os.listdir(img_dir)):
            category_path = os.path.join(img_dir, category)
            for img_name in os.listdir(category_path):
                if img_name.endswith(('.jpg', '.png', '.jpeg')):
                    self.img_paths.append(os.path.join(category_path, img_name))
                    self.labels.append(label)  # 0: openeyes, 1: closeeyes

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        label = self.labels[idx]
        img = Image.open(img_path).convert("RGB")

        if self.transform:
            img = self.transform(img)

        return img, label

# Define transformations
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((150, 150)),  # Ensure the image is resized
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Standard normalization
])

# Define paths
train_dir = "/content/drive/MyDrive/datasets_transformed/train"
val_dir = "/content/drive/MyDrive/datasets_transformed/val"
test_dir = "/content/drive/MyDrive/datasets_transformed/test"

# Create datasets and dataloaders
train_dataset = DrowsinessDataset(train_dir, transform)
val_dataset = DrowsinessDataset(val_dir, transform)
test_dataset = DrowsinessDataset(test_dir, transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


**MODAL**

In [27]:
import torch
import torch.nn as nn
import torch.optim as optim

class EnhancedDrowsinessModel(nn.Module):
    def __init__(self):
        super(EnhancedDrowsinessModel, self).__init__()

        # Define the convolutional layers with batch normalization and ReLU activations
        self.conv_block = nn.Sequential(
            # First convolutional layer (3 input channels, 32 output channels)
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),

            # Max Pooling after the first convolution
            nn.MaxPool2d(2, 2),

            # Second convolutional layer (32 input channels, 64 output channels)
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),

            # Max Pooling after the second convolution
            nn.MaxPool2d(2, 2),

            # Third convolutional layer (64 input channels, 128 output channels)
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),

            # Max Pooling after the third convolution
            nn.MaxPool2d(2, 2)
        )

        # Global Average Pooling (output size will be (batch_size, 128, 1, 1))
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)

        # Fully connected layers
        self.fc1 = nn.Linear(128, 1024)  # Flattened to (128,) after GAP
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 2)  # Output layer for binary classification (open/close eyes)

        # Dropout layer
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        # Pass through convolutional layers and apply global average pooling
        x = self.conv_block(x)
        x = self.global_avg_pool(x)

        # Flatten the tensor from (batch_size, 128, 1, 1) to (batch_size, 128)
        x = x.view(x.size(0), -1)  # Adjust shape dynamically

        # Fully connected layers
        x = self.fc1(x)
        x = self.dropout(x)  # Apply dropout after first fully connected layer
        x = self.fc2(x)
        x = self.fc3(x)  # Final output layer

        return x


# Instantiate and test the model
model = EnhancedDrowsinessModel()
print(model)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)  # Alternatively, you can use model.cuda()

print(f"Model is on device: {device}")


EnhancedDrowsinessModel(
  (conv_block): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU()
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (global_avg_pool): AdaptiveAvgPool2d(output_size=1)
  (fc1): Linear(in_features=128, out_features=1024, bias=True)
  (fc2): Linear(in_features=102

CHECKING FOR GPU

Define the Training and Validation Loop

In [28]:
import torch.nn.functional as F
from torch import optim

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EnhancedDrowsinessModel().to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()  # For binary classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        # Training phase
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # Print training statistics
        train_accuracy = 100 * correct / total
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {train_accuracy:.2f}%")

        # Validation phase
        model.eval()  # Set model to evaluation mode
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_accuracy = 100 * correct / total
        print(f"Validation Accuracy: {val_accuracy:.2f}%")

# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer)


Epoch [1/10], Loss: 0.7909, Accuracy: 50.89%
Validation Accuracy: 57.59%
Epoch [2/10], Loss: 0.7177, Accuracy: 53.57%
Validation Accuracy: 47.77%
Epoch [3/10], Loss: 0.6792, Accuracy: 57.65%
Validation Accuracy: 53.12%
Epoch [4/10], Loss: 0.6775, Accuracy: 55.36%
Validation Accuracy: 54.02%
Epoch [5/10], Loss: 0.6652, Accuracy: 59.18%
Validation Accuracy: 54.02%
Epoch [6/10], Loss: 0.6601, Accuracy: 60.59%
Validation Accuracy: 55.80%
Epoch [7/10], Loss: 0.6651, Accuracy: 57.91%
Validation Accuracy: 59.38%
Epoch [8/10], Loss: 0.6594, Accuracy: 60.46%
Validation Accuracy: 54.02%
Epoch [9/10], Loss: 0.6622, Accuracy: 59.95%
Validation Accuracy: 57.14%
Epoch [10/10], Loss: 0.6616, Accuracy: 61.48%
Validation Accuracy: 54.91%
