Mounted at /content/drive


Collecting colorama
  Downloading colorama-0.4.6-py2.py3-none-any.whl.metadata (17 kB)
Downloading colorama-0.4.6-py2.py3-none-any.whl (25 kB)
Installing collected packages: colorama
Successfully installed colorama-0.4.6


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, SubsetRandomSampler, TensorDataset
from tqdm import tqdm
from colorama import Fore, Style
import os
import json

In [2]:
#model.py

import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange
from einops.layers.torch import Rearrange

class PatchEmbedding(nn.Module):
    """Split image into patches and embed them."""
    def __init__(self, image_size=224, patch_size=16, in_channels=3, embed_dim=768):
        super().__init__()
        self.image_size = image_size
        self.patch_size = patch_size
        self.num_patches = (image_size // patch_size) ** 2

        self.projection = nn.Sequential(
            # Convert image into patches and flatten
            Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)',
                      p1=patch_size, p2=patch_size),
            nn.Linear(patch_size * patch_size * in_channels, embed_dim)
        )

        # Add learnable classification token
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))

        # Add learnable position embeddings
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, embed_dim))

    def forward(self, x):
        b = x.shape[0]  # batch size
        x = self.projection(x)

        # Add classification token to each sequence
        cls_tokens = self.cls_token.expand(b, -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)

        # Add position embeddings
        x = x + self.pos_embedding
        return x

class MultiHeadAttention(nn.Module):
    """Multi-head self-attention mechanism."""
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        assert self.head_dim * num_heads == embed_dim, "embed_dim must be divisible by num_heads"

        self.qkv = nn.Linear(embed_dim, embed_dim * 3)
        self.att_drop = nn.Dropout(0.1)
        self.projection = nn.Linear(embed_dim, embed_dim)

    def forward(self, x):
        batch_size, num_patches, embed_dim = x.shape

        qkv = self.qkv(x)
        qkv = qkv.reshape(batch_size, num_patches, 3, self.num_heads, self.head_dim)
        qkv = qkv.permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]

        # Attention
        att = (q @ k.transpose(-2, -1)) * (self.head_dim ** -0.5)
        att = F.softmax(att, dim=-1)
        att = self.att_drop(att)

        x = (att @ v).transpose(1, 2).reshape(batch_size, num_patches, embed_dim)
        x = self.projection(x)
        return x

class TransformerBlock(nn.Module):
    """Transformer block with attention and MLP."""
    def __init__(self, embed_dim, num_heads, mlp_ratio=4.0):
        super().__init__()
        self.norm1 = nn.LayerNorm(embed_dim)
        self.attn = MultiHeadAttention(embed_dim, num_heads)
        self.norm2 = nn.LayerNorm(embed_dim)

        mlp_hidden_dim = int(embed_dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(embed_dim, mlp_hidden_dim),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(mlp_hidden_dim, embed_dim),
            nn.Dropout(0.1)
        )

    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        x = x + self.mlp(self.norm2(x))
        return x

class ViT(nn.Module):
    """Vision Transformer for binary classification."""
    def __init__(self,
                 image_size=224,
                 patch_size=16,
                 in_channels=3,
                 embed_dim=768,
                 num_layers=12,
                 num_heads=12,
                 mlp_ratio=4.0,
                 dropout=0.1):
        super().__init__()

        # Patch Embedding
        self.patch_embed = PatchEmbedding(
            image_size=image_size,
            patch_size=patch_size,
            in_channels=in_channels,
            embed_dim=embed_dim
        )

        # Transformer Encoder
        self.transformer = nn.Sequential(*[
            TransformerBlock(embed_dim, num_heads, mlp_ratio)
            for _ in range(num_layers)
        ])

        # Classification Head
        self.norm = nn.LayerNorm(embed_dim)
        self.head = nn.Sequential(
            nn.LayerNorm(embed_dim),
            nn.Linear(embed_dim, 1)  # Binary classification
        )

        # Dropout
        self.dropout = nn.Dropout(dropout)

        # Initialize weights
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            torch.nn.init.xavier_uniform_(m.weight)
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.LayerNorm):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    def forward(self, x):
        # Patch embedding
        x = self.patch_embed(x)

        # Apply dropout
        x = self.dropout(x)

        # Transformer blocks
        x = self.transformer(x)

        # Classification token
        x = self.norm(x)
        x = x[:, 0]  # Use [CLS] token

        # Classification head
        x = self.head(x)
        return x




import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange
from einops.layers.torch import Rearrange

class PatchEmbedding(nn.Module):
    """Split image into patches and embed them."""
    def __init__(self, image_size=224, patch_size=16, in_channels=3, embed_dim=768):
        super().__init__()
        self.image_size = image_size
        self.patch_size = patch_size
        self.num_patches = (image_size // patch_size) ** 2

        self.projection = nn.Sequential(
            # Convert image into patches and flatten
            Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)',
                      p1=patch_size, p2=patch_size),
            nn.Linear(patch_size * patch_size * in_channels, embed_dim)
        )

        # Add learnable classification token
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))

        # Add learnable position embeddings
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, embed_dim))

    def forward(self, x):
        b = x.shape[0]  # batch size
        x = self.projection(x)

        # Add classification token to each sequence
        cls_tokens = self.cls_token.expand(b, -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)

        # Add position embeddings
        x = x + self.pos_embedding
        return x

class MultiHeadAttention(nn.Module):
    """Multi-head self-attention mechanism."""
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        assert self.head_dim * num_heads == embed_dim, "embed_dim must be divisible by num_heads"

        self.qkv = nn.Linear(embed_dim, embed_dim * 3)
        self.att_drop = nn.Dropout(0.1)
        self.projection = nn.Linear(embed_dim, embed_dim)

    def forward(self, x):
        batch_size, num_patches, embed_dim = x.shape

        qkv = self.qkv(x)
        qkv = qkv.reshape(batch_size, num_patches, 3, self.num_heads, self.head_dim)
        qkv = qkv.permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]

        # Attention
        att = (q @ k.transpose(-2, -1)) * (self.head_dim ** -0.5)
        att = F.softmax(att, dim=-1)
        att = self.att_drop(att)

        x = (att @ v).transpose(1, 2).reshape(batch_size, num_patches, embed_dim)
        x = self.projection(x)
        return x

class TransformerBlock(nn.Module):
    """Transformer block with attention and MLP."""
    def __init__(self, embed_dim, num_heads, mlp_ratio=4.0):
        super().__init__()
        self.norm1 = nn.LayerNorm(embed_dim)
        self.attn = MultiHeadAttention(embed_dim, num_heads)
        self.norm2 = nn.LayerNorm(embed_dim)

        mlp_hidden_dim = int(embed_dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(embed_dim, mlp_hidden_dim),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(mlp_hidden_dim, embed_dim),
            nn.Dropout(0.1)
        )

    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        x = x + self.mlp(self.norm2(x))
        return x




import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange
from einops.layers.torch import Rearrange

class PatchEmbedding(nn.Module):
    """Split image into patches and embed them."""
    def __init__(self, image_size=224, patch_size=16, in_channels=3, embed_dim=768):
        super().__init__()
        self.image_size = image_size
        self.patch_size = patch_size
        self.num_patches = (image_size // patch_size) ** 2

        self.projection = nn.Sequential(
            # Convert image into patches and flatten
            Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)',
                      p1=patch_size, p2=patch_size),
            nn.Linear(patch_size * patch_size * in_channels, embed_dim)
        )

        # Add learnable classification token
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))

        # Add learnable position embeddings
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, embed_dim))

    def forward(self, x):
        b = x.shape[0]  # batch size
        x = self.projection(x)

        # Add classification token to each sequence
        cls_tokens = self.cls_token.expand(b, -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)

        # Add position embeddings
        x = x + self.pos_embedding
        return x

class MultiHeadAttention(nn.Module):
    """Multi-head self-attention mechanism."""
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        assert self.head_dim * num_heads == embed_dim, "embed_dim must be divisible by num_heads"

        self.qkv = nn.Linear(embed_dim, embed_dim * 3)
        self.att_drop = nn.Dropout(0.1)
        self.projection = nn.Linear(embed_dim, embed_dim)

    def forward(self, x):
        batch_size, num_patches, embed_dim = x.shape

        qkv = self.qkv(x)
        qkv = qkv.reshape(batch_size, num_patches, 3, self.num_heads, self.head_dim)
        qkv = qkv.permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]

        # Attention
        att = (q @ k.transpose(-2, -1)) * (self.head_dim ** -0.5)
        att = F.softmax(att, dim=-1)
        att = self.att_drop(att)

        x = (att @ v).transpose(1, 2).reshape(batch_size, num_patches, embed_dim)
        x = self.projection(x)
        return x

class TransformerBlock(nn.Module):
    """Transformer block with attention and MLP."""
    def __init__(self, embed_dim, num_heads, mlp_ratio=4.0):
        super().__init__()
        self.norm1 = nn.LayerNorm(embed_dim)
        self.attn = MultiHeadAttention(embed_dim, num_heads)
        self.norm2 = nn.LayerNorm(embed_dim)

        mlp_hidden_dim = int(embed_dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(embed_dim, mlp_hidden_dim),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(mlp_hidden_dim, embed_dim),
            nn.Dropout(0.1)
        )

    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        x = x + self.mlp(self.norm2(x))
        return x




import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange
from einops.layers.torch import Rearrange

class PatchEmbedding(nn.Module):
    """Split image into patches and embed them."""
    def __init__(self, image_size=224, patch_size=16, in_channels=3, embed_dim=768):
        super().__init__()
        self.image_size = image_size
        self.patch_size = patch_size
        self.num_patches = (image_size // patch_size) ** 2

        self.projection = nn.Sequential(
            # Convert image into patches and flatten
            Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)',
                      p1=patch_size, p2=patch_size),
            nn.Linear(patch_size * patch_size * in_channels, embed_dim)
        )

        # Add learnable classification token
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))

        # Add learnable position embeddings
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, embed_dim))

    def forward(self, x):
        b = x.shape[0]  # batch size
        x = self.projection(x)

        # Add classification token to each sequence
        cls_tokens = self.cls_token.expand(b, -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)

        # Add position embeddings
        x = x + self.pos_embedding
        return x

class MultiHeadAttention(nn.Module):
    """Multi-head self-attention mechanism."""
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        assert self.head_dim * num_heads == embed_dim, "embed_dim must be divisible by num_heads"

        self.qkv = nn.Linear(embed_dim, embed_dim * 3)
        self.att_drop = nn.Dropout(0.1)
        self.projection = nn.Linear(embed_dim, embed_dim)

    def forward(self, x):
        batch_size, num_patches, embed_dim = x.shape

        qkv = self.qkv(x)
        qkv = qkv.reshape(batch_size, num_patches, 3, self.num_heads, self.head_dim)
        qkv = qkv.permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]

        # Attention
        att = (q @ k.transpose(-2, -1)) * (self.head_dim ** -0.5)
        att = F.softmax(att, dim=-1)
        att = self.att_drop(att)

        x = (att @ v).transpose(1, 2).reshape(batch_size, num_patches, embed_dim)
        x = self.projection(x)
        return x

class TransformerBlock(nn.Module):
    """Transformer block with attention and MLP."""
    def __init__(self, embed_dim, num_heads, mlp_ratio=4.0):
        super().__init__()
        self.norm1 = nn.LayerNorm(embed_dim)
        self.attn = MultiHeadAttention(embed_dim, num_heads)
        self.norm2 = nn.LayerNorm(embed_dim)

        mlp_hidden_dim = int(embed_dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(embed_dim, mlp_hidden_dim),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(mlp_hidden_dim, embed_dim),
            nn.Dropout(0.1)
        )

    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        x = x + self.mlp(self.norm2(x))
        return x

class ViT(nn.Module):
    """Vision Transformer for binary classification."""
    def __init__(self,
                 image_size=224,
                 patch_size=16,
                 in_channels=3,
                 embed_dim=768,
                 num_layers=12,
                 num_heads=12,
                 mlp_ratio=4.0,
                 dropout=0.1):
        super().__init__()

        # Patch Embedding
        self.patch_embed = PatchEmbedding(
            image_size=image_size,
            patch_size=patch_size,
            in_channels=in_channels,
            embed_dim=embed_dim
        )

        # Transformer Encoder
        self.transformer = nn.Sequential(*[
            TransformerBlock(embed_dim, num_heads, mlp_ratio)
            for _ in range(num_layers)
        ])

        # Classification Head
        self.norm = nn.LayerNorm(embed_dim)
        self.head = nn.Sequential(
            nn.LayerNorm(embed_dim),
            nn.Linear(embed_dim, 1)  # Binary classification
        )

        # Dropout
        self.dropout = nn.Dropout(dropout)

        # Initialize weights
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            torch.nn.init.xavier_uniform_(m.weight)
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.LayerNorm):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    def forward(self, x):
        # Patch embedding
        x = self.patch_embed(x)

        # Apply dropout
        x = self.dropout(x)

        # Transformer blocks
        x = self.transformer(x)

        # Classification token
        x = self.norm(x)
        x = x[:, 0]  # Use [CLS] token

        # Classification head
        x = self.head(x)
        return x

def get_vit(pretrained=False):
    """Returns an initialized ViT model for binary classification."""
    model = ViT(
        image_size=224,        # Required input size
        patch_size=16,         # Size of patches
        in_channels=3,         # RGB images
        embed_dim=768,         # Embedding dimension
        num_layers=12,         # Number of transformer blocks
        num_heads=12,          # Number of attention heads
        mlp_ratio=4.0,         # MLP hidden dimension ratio
        dropout=0.1            # Dropout rate
    )

    if pretrained:
        # You would typically load pretrained weights here
        # This is left as a placeholder
        pass

    return model

In [3]:

# ✅ Paths
DATA_PATH = r"C:\Users\Aviral\Desktop\DeepLock\data\preprocessed_data_p1.pth"  # your .pth dataset
MODEL_PATH = r"C:\Users\Aviral\Desktop\DeepLock\models\Final Models\deeplock_vit.pth"
LOG_PATH = r"C:\Users\Aviral\Desktop\DeepLock\logs_vit.json"


In [4]:

# ✅ Load Dataset
print("📂 Loading dataset...")
X, y = torch.load(DATA_PATH)
dataset = TensorDataset(X, y)
print(f"{Fore.CYAN}✅ Dataset loaded: {len(dataset)} samples{Style.RESET_ALL}")


📂 Loading dataset...


  X, y = torch.load(DATA_PATH)


[36m✅ Dataset loaded: 10190 samples[0m


In [None]:

# from model import get_vit  # make sure this returns a ViT model with output 1

# ✅ Paths
# DATA_PATH = "/content/drive/MyDrive/DEEPLOCK/preprocessed_data_p1.pth"  # your .pth dataset
# MODEL_PATH = "/content/drive/MyDrive/DEEPLOCK/deeplock_vit.pth"
# LOG_PATH = "/content/drive/MyDrive/DEEPLOCK/logs_vit.json"

# ✅ Configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
EPOCHS = 40
BATCH_SIZE = 16
NUM_WORKERS = 2

# ✅ Initialize Model
if os.path.exists(MODEL_PATH):
    model = get_vit(pretrained=False).to(device)
    model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
    print(f"{Fore.YELLOW}→ Loaded existing ViT model from '{MODEL_PATH}'{Style.RESET_ALL}")
else:
    model = get_vit(pretrained=True).to(device)
    print(f"{Fore.YELLOW}→ Initializing new ViT model (no checkpoint found){Style.RESET_ALL}")

# ✅ Train/Val Split
total_samples = len(dataset)
train_size = int(0.8 * total_samples)
train_indices = list(range(train_size))
val_indices = list(range(train_size, total_samples))

train_loader = DataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    sampler=SubsetRandomSampler(train_indices),
    num_workers=NUM_WORKERS,
    pin_memory=False
)
val_loader = DataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    sampler=SubsetRandomSampler(val_indices),
    num_workers=NUM_WORKERS,
    pin_memory=False
)

# ✅ Optimizer, Scheduler, Loss
optimizer = optim.AdamW(model.parameters(), lr=1e-5, weight_decay=0.01)
scheduler = optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=2e-3,
    steps_per_epoch=len(train_loader),
    epochs=EPOCHS
)
criterion = nn.BCEWithLogitsLoss()

# ✅ Training Loop
best_acc = 0.0
logs = []

for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")

    # --- Training ---
    model.train()
    train_loss, correct_train, total_train = 0.0, 0, 0

    train_progress = tqdm(
        train_loader,
        desc=f"{Fore.GREEN}Train{Style.RESET_ALL}",
        ncols=100,
        bar_format="{l_bar}%s{bar}%s{r_bar}" % (Fore.GREEN, Style.RESET_ALL)
    )

    for images, labels in train_progress:
        images = images.to(device, non_blocking=True)
        labels = labels.float().unsqueeze(1).to(device, non_blocking=True)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()

        train_loss += loss.item()
        preds = torch.sigmoid(outputs) > 0.5
        correct_train += (preds == labels).sum().item()
        total_train += labels.size(0)

        train_progress.set_postfix({
            'loss': f"{train_loss / (total_train / images.size(0)):.4f}",
            'acc': f"{100 * correct_train / total_train:.2f}%"
        })

    # --- Validation ---
    model.eval()
    val_loss, correct_val, total_val = 0.0, 0, 0

    val_progress = tqdm(
        val_loader,
        desc=f"{Fore.CYAN}Val{Style.RESET_ALL}",
        ncols=100,
        bar_format="{l_bar}%s{bar}%s{r_bar}" % (Fore.CYAN, Style.RESET_ALL)
    )

    with torch.no_grad():
        for images, labels in val_progress:
            images = images.to(device, non_blocking=True)
            labels = labels.float().unsqueeze(1).to(device, non_blocking=True)

            outputs = model(images)
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            preds = torch.sigmoid(outputs) > 0.5
            correct_val += (preds == labels).sum().item()
            total_val += labels.size(0)

            val_progress.set_postfix({
                'loss': f"{val_loss / (total_val / images.size(0)):.4f}",
                'acc': f"{100 * correct_val / total_val:.2f}%"
            })

    # ✅ Save Best Model
    val_acc = 100 * correct_val / total_val
    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), MODEL_PATH)
        print(f"{Fore.GREEN}→ 🏆 New best model saved! Validation accuracy: {val_acc:.2f}%{Style.RESET_ALL}")

    logs.append({
        "epoch": epoch + 1,
        "train_loss": train_loss / len(train_loader),
        "train_acc": 100 * correct_train / total_train,
        "val_loss": val_loss / len(val_loader),
        "val_acc": val_acc
    })

    print()

# ✅ Save Logs
with open(LOG_PATH, "w") as f:
    json.dump(logs, f, indent=4)

print(f"✅ Training complete! Best validation accuracy: {best_acc:.2f}%")
print(f"Model saved to: {MODEL_PATH}")
print(f"Logs saved to: {LOG_PATH}")


[33m→ Initializing new ViT model (no checkpoint found)[0m

Epoch 1/40


[32mTrain[0m:   0%|[32m                                                                [0m| 0/255 [00:00<?, ?it/s][0m


RuntimeError: Couldn't open shared file mapping: <torch_29080_4099347938_0>, error code: <1450>