In [1]:
import os
import torch
import torchvision
from torchvision.models.detection.faster_rcnn import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.resnet import resnet101
from torch.utils.data import DataLoader, Dataset
import cv2
import torch.nn as nn
from gensim.models import KeyedVectors
from torchvision import transforms
from torch.nn import functional as F


In [2]:

# Paths
DATASET_PATH = r"E:\My Research Project\CODE\DataCombined"
SAFETY_RULES_PATH = r"E:\My Research Project\CODE\Saftey Rules-OG.txt"  # Safety rules file
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
NUM_CLASSES = 99  # Adjust according to your dataset
print(f"Model Running on {DEVICE}")


Model Running on cuda


In [3]:

# Custom Dataset with Bounding Box Check
class CustomDataset(Dataset):
    def __init__(self, images_path, labels_path, transform=None):
        self.images_path = images_path
        self.labels_path = labels_path
        self.image_files = [f for f in os.listdir(images_path) if f.endswith(('.jpg', '.png'))]
        self.label_files = [f for f in os.listdir(labels_path) if f.endswith('.txt')]
        self.transform = transform if transform else transforms.ToTensor()

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.images_path, self.image_files[idx])
        label_path = os.path.join(self.labels_path, self.label_files[idx])
        image = cv2.imread(img_path)
        if image is None:
            raise ValueError(f"Failed to load image: {img_path}")
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = self.transform(image)
        with open(label_path, "r") as f:
            labels = [list(map(float, line.strip().split())) for line in f]

        boxes = []
        classes = []
        for label in labels:
            class_id, x_center, y_center, width, height = label
            if width > 0 and height > 0: 
                xmin = x_center - width / 2
                ymin = y_center - height / 2
                xmax = x_center + width / 2
                ymax = y_center + height / 2
                boxes.append([xmin, ymin, xmax, ymax])
                classes.append(int(class_id))
        if len(boxes) == 0:  
            return self.__getitem__((idx + 1) % len(self))

        boxes = torch.tensor(boxes, dtype=torch.float32)
        classes = torch.tensor(classes, dtype=torch.int64)
        target = {"boxes": boxes, "labels": classes}
        return image, target


In [4]:
def get_data_loader(dataset_path, mode, batch_size=1):
    images_path = os.path.join(dataset_path, mode, "images")
    labels_path = os.path.join(dataset_path, mode, "labels")
    dataset = CustomDataset(images_path, labels_path, transform=transforms.ToTensor())
    return DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))


In [5]:

# Text Embedding with Bi-GRU
class TextEmbedding(nn.Module):
    def __init__(self, glove_file=r"E:\My Research Project\CODE\glove.6B.300d.txt", embedding_dim=300, hidden_dim=128):
        super(TextEmbedding, self).__init__()
        print(f"Loading GloVe embeddings from {glove_file}...")
        self.glove_model = KeyedVectors.load_word2vec_format(glove_file, binary=False, no_header=True)
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.gru = nn.GRU(input_size=embedding_dim, hidden_size=hidden_dim, batch_first=True, bidirectional=True)

    def forward(self, text_input):
        embeddings = [
            torch.tensor([self.glove_model[word] for word in sentence.split() if word in self.glove_model], dtype=torch.float32)
            for sentence in text_input
        ]
        padded_embeddings = nn.utils.rnn.pad_sequence(embeddings, batch_first=True)
        packed_embeddings = nn.utils.rnn.pack_padded_sequence(padded_embeddings, [len(e) for e in embeddings], batch_first=True, enforce_sorted=False)
        _, hidden = self.gru(packed_embeddings)
        return torch.cat((hidden[0], hidden[1]), dim=-1)  # Concatenate Bi-GRU outputs


In [6]:

# ResNet-101 Backbone with RPN
def get_model(num_classes):
    backbone = resnet101(pretrained=False)
    backbone = nn.Sequential(*list(backbone.children())[:-2])  
    backbone.out_channels = 2048  

    anchor_generator = AnchorGenerator(
        sizes=((32, 64, 128, 256, 512),),
        aspect_ratios=((0.5, 1.0, 2.0),) * 5
    )

    roi_pooler = torchvision.ops.MultiScaleRoIAlign(
        featmap_names=['0'], output_size=7, sampling_ratio=2
    )

    model = FasterRCNN(
        backbone,
        num_classes=num_classes,
        rpn_anchor_generator=anchor_generator,
        box_roi_pool=roi_pooler
    )
    return model


In [7]:

# Triplet Loss
class TripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        pos_dist = F.pairwise_distance(anchor, positive, p=2)
        neg_dist = F.pairwise_distance(anchor, negative, p=2)
        loss = torch.clamp(pos_dist - neg_dist + self.margin, min=0.0)
        return loss.mean()


In [8]:

# Stacked Cross Attention Mechanism
class CrossAttention(nn.Module):
    def __init__(self, visual_dim, text_dim):
        super(CrossAttention, self).__init__()
        self.query_proj = nn.Linear(visual_dim, text_dim)
        self.key_proj = nn.Linear(text_dim, text_dim)
        self.value_proj = nn.Linear(text_dim, text_dim)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, visual_features, text_features):
        queries = self.query_proj(visual_features)
        keys = self.key_proj(text_features)
        values = self.value_proj(text_features)
        attention_weights = self.softmax(torch.matmul(queries, keys.transpose(-1, -2)))
        attended_features = torch.matmul(attention_weights, values)
        return attended_features


In [9]:

# Combined Model
class CombinedModel(nn.Module):
    def __init__(self, num_classes):
        super(CombinedModel, self).__init__()
        self.frcnn = get_model(num_classes)
        self.text_embedding = TextEmbedding()
        self.cross_attention = CrossAttention(2048, 256)
        self.fc = nn.Linear(2048 + 256, num_classes)

    def forward(self, images, targets=None, safety_text=None, triplet_inputs=None):
        if self.training:
            losses = self.frcnn(images, targets)
            if triplet_inputs is not None:
                anchor, positive, negative = triplet_inputs
                triplet_loss_fn = TripletLoss()
                triplet_loss = triplet_loss_fn(anchor, positive, negative)
                losses["triplet_loss"] = triplet_loss
            return losses
        else:
            detections = self.frcnn(images)
            if safety_text is not None:
                text_features = self.text_embedding(safety_text)
                visual_features = detections["features"]  # Pre-extracted visual features
                attended_features = self.cross_attention(visual_features, text_features)
                combined_features = torch.cat((visual_features, attended_features), dim=1)
                return self.fc(combined_features)
            return detections


In [10]:
def evaluate_model(model, data_loader):
    model.eval()
    with torch.no_grad():
        total_loss = 0
        for images, targets in data_loader:
            images = [img.to(DEVICE) for img in images]
            targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
            
            # Perform validation forward pass
            outputs = model(images, targets=targets)
            loss = sum(outputs.values())
            total_loss += loss.item()
        
        avg_loss = total_loss / len(data_loader)
        print(f"Validation Loss: {avg_loss:.4f}")

In [14]:

# Training and Testing with Checkpointing
def train_model():
    train_loader = get_data_loader(DATASET_PATH, "train")
    val_loader = get_data_loader(DATASET_PATH, "val")
    model = CombinedModel(num_classes=NUM_CLASSES).to(DEVICE)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
    checkpoint_path = r"E:\My Research Project\CODE\model_checkpoint.pth"
    start_epoch = 0

    # Load checkpoint if exists
 

    for epoch in range(start_epoch, 3):
        model.train()
        print(f"Epoch {epoch+1} is running")
        for images, targets in train_loader:
            images = [img.to(DEVICE) for img in images]
            targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
            triplet_inputs = None  # Add your triplet data here if available

            optimizer.zero_grad()
            losses = model(images, targets, triplet_inputs=triplet_inputs)
            loss = sum(losses.values())
            loss.backward()
            optimizer.step()
                    # Save checkpoint
        torch.save({
            "model_state": model.state_dict(),
            "optimizer_state": optimizer.state_dict(),
            "epoch": epoch
        }, checkpoint_path)
        print(f"Checkpoint saved at epoch {epoch + 1}")
        print(f"Epoch {epoch + 1}, Loss: {loss.item()}")
        evaluate_model(model, val_loader)



In [15]:
def test_model(model, data_loader):
    model.eval()
    predictions = []
    with torch.no_grad():
        for images, _ in data_loader:
            images = [img.to(DEVICE) for img in images]
            outputs = model(images)
            predictions.append(outputs)
    return predictions

In [16]:



if __name__ == "__main__":
    # Train and validate the model
    train_model()

   


Loading GloVe embeddings from E:\My Research Project\CODE\glove.6B.300d.txt...
Epoch 1 is running


RuntimeError: CUDA error: out of memory
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
