In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset
from PIL import Image
import os
from torch.utils.data import DataLoader
from torchvision import transforms
from torch.utils.data import random_split

# Define transformations for training and validation
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to match model input
    transforms.RandomHorizontalFlip(),  # Randomly flip images horizontally
    transforms.RandomRotation(15),  # Randomly rotate images by up to 15 degrees
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),  # Randomly alter image color properties
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Standard normalization for CLIP
])

val_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
class CarDamageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None, mode='train'):
        self.car_frame = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.mode = mode

    def __len__(self):
        return len(self.car_frame)

    def __getitem__(self, idx):
        img_name = self.car_frame.iloc[idx, 0]  # Image filename
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert('RGB')
        text = self.car_frame.iloc[idx, 3]  # Damages description
        label = self.car_frame.iloc[idx, 2]  # Class_ID as label

        # Apply different transformations based on mode
        if self.transform:
            if self.mode == 'train':
                image = train_transforms(image)
            else:
                image = val_transforms(image)

        sample = {'image': image, 'text': text, 'label': label}
        return sample
        return sample

In [None]:
# Load the dataset without applying any transformations initially
full_dataset = CarDamageDataset(csv_file='merged_repair_data.csv', img_dir='/home/hous/Desktop/LLAVA/CarDD_release/CarDD_COCO/data')

In [None]:
# Splitting the dataset into training and validation sets
train_size = int(0.9 * len(full_dataset))
val_size = len(full_dataset) - train_size  
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

In [None]:
train_dataset = CarDamageDataset(csv_file='merged_repair_data.csv', img_dir='/home/hous/Desktop/LLAVA/CarDD_release/CarDD_COCO/data', transform=train_transforms, mode='train')
val_dataset = CarDamageDataset(csv_file='merged_repair_data.csv', img_dir='/home/hous/Desktop/LLAVA/CarDD_release/CarDD_COCO/data', transform=val_transforms, mode='val')

In [None]:
# Define data loaders for training and validation
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=4)

In [None]:
# Fetch a batch from the training data loader
train_batch = next(iter(train_loader))

# Check the shapes and types of the batch
print(f"Image batch shape: {train_batch['image'].shape}")
print(f"Text batch: {train_batch['text']}")
print(f"Label batch shape: {train_batch['label'].shape}")

In [None]:
import clip
import torch
from torchvision import models
from torch import nn

# Load the original CLIP model architecture
device = "cuda:0" if torch.cuda.is_available() else "cpu"
clip_model, preprocess = clip.load("RN50", device=device, jit=False)

# Load the fine-tuned model weights
clip_model_path = '/home/hous/Desktop/LLAVA/best_matching_model.pth'
clip_model.load_state_dict(torch.load(clip_model_path))

# Set the CLIP model to evaluation mode
clip_model.eval();

In [None]:
import torch
from torch import nn
import clip

class CarDamageClassifier(nn.Module):
    def __init__(self, clip_model, num_classes, device):
        super(CarDamageClassifier, self).__init__()
        self.clip_model = clip_model

        dummy_image = torch.randn(1, 3, 224, 224, device=device)
        dummy_text = clip.tokenize(["dummy text"]).to(device)
        
        with torch.no_grad():
            image_features = self.clip_model.encode_image(dummy_image)
            text_features = self.clip_model.encode_text(dummy_text)

        image_output_dim = image_features.shape[1]
        text_output_dim = text_features.shape[1]

        self.proj_dim = 1024  # Projection dimension
        self.projector = nn.Linear(image_output_dim + text_output_dim, self.proj_dim)
        
        # Attention mechanism
        self.num_heads = 8  # Number of heads for MultiheadAttention
        self.attention = nn.MultiheadAttention(embed_dim=self.proj_dim, num_heads=self.num_heads, batch_first=True)

        # Define layers with increased complexity and Layer Normalization
        self.fc1 = nn.Linear(self.proj_dim, 2048)
        self.ln1 = nn.LayerNorm(2048)
        self.fc2 = nn.Linear(2048, 1024)
        self.ln2 = nn.LayerNorm(1024)
        self.fc3 = nn.Linear(1024, 512)
        self.ln3 = nn.LayerNorm(512)
        self.fc4 = nn.Linear(512, 256)
        self.ln4 = nn.LayerNorm(256)
        self.fc5 = nn.Linear(256, num_classes)
        
        self.relu = nn.LeakyReLU(negative_slope=0.01)
        self.dropout = nn.Dropout(0.4)

    def forward(self, image, text):
        with torch.no_grad():
            image_features = self.clip_model.encode_image(image)
            text_features = self.clip_model.encode_text(text)

        text_features = text_features.to(image_features.dtype)

        combined_features = torch.cat((image_features, text_features), dim=1)
        combined_features = combined_features.to(dtype=self.projector.weight.dtype)

        projected_features = self.projector(combined_features)

        # Apply attention
        batch_size = image.size(0)
        attn_output, _ = self.attention(projected_features.unsqueeze(1), projected_features.unsqueeze(1), projected_features.unsqueeze(1))
        attn_output = attn_output.reshape(batch_size, -1)

        # Pass through the network with Layer Normalization and dropout layers
        x = self.fc1(attn_output)
        x = self.ln1(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc2(x)
        x = self.ln2(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc3(x)
        x = self.ln3(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc4(x)
        x = self.ln4(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc5(x)
        return x

In [None]:
num_classes = 4  
model = CarDamageClassifier(clip_model, num_classes,device);
model.to(device);

In [None]:
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, threshold=0.01)

In [None]:
from tqdm import tqdm

best_val_accuracy = 0.0
num_epochs = 100 

for epoch in range(num_epochs):
    model.train()  
    running_loss = 0.0
    correct_train = 0
    total_train = 0
    total_batches = len(train_loader)
    
    # Training loop with tqdm
    train_loop = tqdm(train_loader, leave=True)
    for batch in train_loop:
        train_loop.set_description(f"Epoch {epoch+1}/{num_epochs} [Train]")
        images = batch['image'].to(device)
        texts = clip.tokenize(batch['text']).to(device)
        labels = batch['label'].to(device)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(images, texts)
        loss = criterion(outputs, labels)
        _, predicted = torch.max(outputs.data, 1)
        correct_train += (predicted == labels).sum().item()
        total_train += labels.size(0)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        average_loss = running_loss / total_batches
        train_loop.set_postfix(loss=average_loss)

    train_accuracy = correct_train / total_train
    print(f'Epoch {epoch+1}, Training Loss: {average_loss}, Training Accuracy: {train_accuracy}')

    # Validation loop with tqdm
    # Initialize val_loss for the validation phase
    val_loss = 0.0
    model.eval()  # Set model to evaluation mode
    correct = 0
    total = 0
    val_loop = tqdm(val_loader, leave=False)
    for batch in val_loop:
        val_loop.set_description(f"Epoch {epoch+1}/{num_epochs} [Validation]")
        images = batch['image'].to(device)
        texts = clip.tokenize(batch['text']).to(device)  # Tokenize and move to device
        labels = batch['label'].to(device)

        outputs = model(images, texts)
        loss = criterion(outputs, labels)
        val_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    average_val_loss = val_loss / len(val_loader)
    val_accuracy = correct / total
    print(f'Epoch {epoch+1}, Loss: {average_val_loss}, Validation Accuracy: {val_accuracy}')

    # Step the scheduler with the average validation loss
    scheduler.step(average_val_loss)
    
    # Save best model
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), 'best_cost_prediction_model.pth')

print('Finished Training')