In [None]:
!pip install open_clip_torch torch torchvision pillow


# Base Model

In [None]:
from google.colab import drive
drive.mount('/content/drive')

yes_folder = '/content/drive/MyDrive/Group-Publications/VLM - Safety/Guardrail/Yes'  # Update this path
no_folder = '/content/drive/MyDrive/Group-Publications/VLM - Safety/Guardrail/No'    # Update this path


In [None]:
import torch
import open_clip
from PIL import Image
from google.colab import files
from torchvision import transforms

# Load the OpenCLIP ViT-L/14 model
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess, _ = open_clip.create_model_and_transforms('ViT-L-14', pretrained='openai')  # Fix here
tokenizer = open_clip.get_tokenizer('ViT-L-14')

# Define the text prompts
text_labels = ["A highway with a sturdy metal guardrail for vehicle safety.", "An open road without a guardrail, exposing vehicles to roadside hazards."]
text_inputs = tokenizer(text_labels).to(device)

def classify_image(image_path):
    """
    Classifies whether an image contains a guardrail or not using OpenCLIP ViT-L/14.
    """
    # Load and preprocess the image
    image = Image.open(image_path).convert("RGB")
    image_input = preprocess(image).unsqueeze(0).to(device)

    # Run model inference
    with torch.no_grad():
        image_features = model.encode_image(image_input)
        text_features = model.encode_text(text_inputs)

        # Normalize features
        image_features /= image_features.norm(dim=-1, keepdim=True)
        text_features /= text_features.norm(dim=-1, keepdim=True)

        # Compute similarity
        similarity = (image_features @ text_features.T).squeeze(0)
        probs = similarity.softmax(dim=-1).cpu().numpy()

    # Print results
    model_label = "yes" if probs[0] > probs[1] else "no"
    return model_label, probs[0], probs[1]




In [None]:
import os
results = []

def process_folder(folder_path, true_label):
    for filename in os.listdir(folder_path):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            image_path = os.path.join(folder_path, filename)
            image_id = os.path.splitext(filename)[0]
            model_label, prob_yes, prob_no = classify_image(image_path)
            results.append({
                "ID": image_id,
                "label": true_label,
                "model label": model_label,
                "prob. yes": prob_yes,
                "prob. no": prob_no
            })

# Process both folders
process_folder(yes_folder, "yes")
process_folder(no_folder, "no")


In [None]:
from datetime import datetime
import pandas as pd
from zoneinfo import ZoneInfo

for row in results:
    row["correct"] = 1 if row["label"] == row["model label"] else 0

# Save results to Excel
df = pd.DataFrame(results)

# Calculate accuracy
accuracy = df["correct"].sum() / len(df)

# Create an empty row with only accuracy in the 'correct' column
summary_row = {col: "" for col in df.columns}
summary_row["label"] = f"Accuracy: {accuracy:.4f}"

# Append the row to the DataFrame
df = pd.concat([df, pd.DataFrame([summary_row])], ignore_index=True)

df["ID"] = pd.to_numeric(df["ID"], errors='coerce').astype('Int64')
df["correct"] = pd.to_numeric(df["correct"], errors='coerce').astype('Int64')
df["prob. yes"] = pd.to_numeric(df["prob. yes"], errors='coerce').astype(float)
df["prob. no"] = pd.to_numeric(df["prob. no"], errors='coerce').astype(float)


# Create timestamp
est_time = datetime.now(ZoneInfo("America/New_York"))
timestamp = est_time.strftime("%Y-%m-%d_%H-%M")

filename = f"/content/drive/MyDrive/Group-Publications/VLM - Safety/results_{timestamp}.xlsx"

df.to_excel(filename, index=False)

print(f"Classification complete and results saved to: {filename}")

In [None]:
# Upload multiple images from local PC
uploaded = files.upload()  # Opens file selector, allowing multiple selections

# Loop through all uploaded images and classify each one
for image_path in uploaded.keys():
    print(f"\nProcessing: {image_path}")
    classify_image(image_path)

# Co-Op

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

yes_folder = r'C:/Users/alimanso/anaconda_projects/VLM/Few_Shot_Yes/'
no_folder = r'C:/Users/alimanso/anaconda_projects/VLM/Few_Shot_No/'

In [None]:
import torch
torch.cuda.is_available()

In [None]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import open_clip
from tqdm import tqdm
import pandas as pd
from datetime import datetime
from zoneinfo import ZoneInfo
import random

device = "cuda" if torch.cuda.is_available() else "cpu"

# Load model and tokenizer
model, preprocess, _ = open_clip.create_model_and_transforms('ViT-L-14', pretrained='openai')
model = model.to(device)
tokenizer = open_clip.get_tokenizer('ViT-L-14')


## Define Class Names

In [None]:
classnames = ["guardrail", "no guardrail"]
print("token_embedding shape:", model.token_embedding.weight.shape)

## CoOp Prompt Learner --> Unified Context (Option 1)
(Similar learned vectors for both classes) (End-Position Class Token)

In [None]:
# ----- CoOp Prompt Learner -----Unified Context version
class PromptLearner(nn.Module):
    def __init__(self, classnames, ctx_len=4, tokenizer=None, clip_model=None):
        super().__init__()
        self.ctx_len = ctx_len
        self.classnames = classnames
        self.tokenizer = tokenizer
        self.clip_model = clip_model
        self.device = next(clip_model.parameters()).device

        # Get token embedding dimension
        self.embed_dim = clip_model.token_embedding.weight.shape[1]

        # Init learnable context: (ctx_len, embed_dim)
        self.ctx = nn.Parameter(torch.randn(ctx_len, self.embed_dim) * 0.02)

        # Tokenize classnames (IDs)
        tokenized = [tokenizer(f"{name}") for name in classnames]  # no template
        self.tokenized = torch.cat(tokenized).reshape(len(classnames), -1).to(self.device)  # shape: [num_classes, seq_len]

        # Setup special tokens (e.g., [SOS] and [EOS])
        self.sos_id = tokenizer("X")[0][0].item()  # any token will do
        self.eos_id = 49407 if clip_model.token_embedding.num_embeddings >= 49408 else 0  # depends on tokenizer vocab size
        self.total_prompt_len = ctx_len + self.tokenized.shape[1] + 2  # [SOS] + ctx + class + [EOS]

    def forward(self):
        B = len(self.classnames)
        ctx = self.ctx.unsqueeze(0).expand(B, -1, -1)  # shape: [B, ctx_len, D]

        # Token embeddings
        class_embeds = self.clip_model.token_embedding(self.tokenized)  # [B, class_len, D]
        sos = self.clip_model.token_embedding.weight[self.sos_id].unsqueeze(0).unsqueeze(0).expand(B, 1, -1)
        eos = self.clip_model.token_embedding.weight[self.eos_id].unsqueeze(0).unsqueeze(0).expand(B, 1, -1)

        # Final prompt: [SOS] + [ctx] + [class tokens] + [EOS]
        prompt_embeds = torch.cat([sos, ctx, class_embeds, eos], dim=1)  # [B, total_len, D]
        return prompt_embeds


prompt_learner = PromptLearner(
    classnames=classnames,
    ctx_len=8,
    tokenizer=tokenizer,
    clip_model=model
).to(device)


## CoOp Prompt Learner --> Class-Specific Context (Option 2)
(Distinct learned vectors for each class) (End-Position Class Token)

In [None]:
# ----- CoOp Prompt Learner -----Class-Specific Context
class PromptLearner(nn.Module):
    def __init__(self, classnames, ctx_len=4, tokenizer=None, clip_model=None):
        super().__init__()
        self.ctx_len = ctx_len
        self.classnames = classnames
        self.tokenizer = tokenizer
        self.clip_model = clip_model
        self.device = next(clip_model.parameters()).device

        self.embed_dim = clip_model.token_embedding.weight.shape[1]

        # Class-specific learnable context: [num_classes, ctx_len, embed_dim]
        self.ctx = nn.Parameter(torch.randn(len(classnames), ctx_len, self.embed_dim) * 0.02)

        # Tokenize classnames (IDs)
        tokenized = [tokenizer(f"{name}") for name in classnames]
        self.tokenized = torch.cat(tokenized).reshape(len(classnames), -1).to(self.device)

        self.sos_id = tokenizer("X")[0][0].item()
        self.eos_id = 49407 if clip_model.token_embedding.num_embeddings >= 49408 else 0
        self.total_prompt_len = ctx_len + self.tokenized.shape[1] + 2

    def forward(self):
        B = len(self.classnames)

        # Token embeddings for each class
        class_embeds = self.clip_model.token_embedding(self.tokenized)  # [B, class_len, D]
        sos = self.clip_model.token_embedding.weight[self.sos_id].unsqueeze(0).unsqueeze(0).expand(B, 1, -1)
        eos = self.clip_model.token_embedding.weight[self.eos_id].unsqueeze(0).unsqueeze(0).expand(B, 1, -1)

        # Use class-specific context
        ctx = self.ctx  # shape: [B, ctx_len, D]

        prompt_embeds = torch.cat([sos, ctx, class_embeds, eos], dim=1)  # [B, total_len, D]
        return prompt_embeds


prompt_learner = PromptLearner(
    classnames=classnames,
    ctx_len=8,
    tokenizer=tokenizer,
    clip_model=model
).to(device)

## CoOp Prompt Learner --> Class-Specific Context (Option 3)
(Distinct learned vectors for each class) (Mid-Position Class Token)


In [None]:
# ----- CoOp Prompt Learner -----Class-Specific Context class in middle
class PromptLearner(nn.Module):
    def __init__(self, classnames, ctx_len=4, tokenizer=None, clip_model=None):
        super().__init__()
        self.ctx_len = ctx_len
        self.classnames = classnames
        self.tokenizer = tokenizer
        self.clip_model = clip_model
        self.device = next(clip_model.parameters()).device

        self.embed_dim = clip_model.token_embedding.weight.shape[1]

        # Class-specific learnable context: [num_classes, ctx_len, embed_dim]
        self.ctx = nn.Parameter(torch.randn(len(classnames), ctx_len, self.embed_dim) * 0.02)

        # Tokenize classnames (IDs)
        tokenized = [tokenizer(f"{name}") for name in classnames]
        self.tokenized = torch.cat(tokenized).reshape(len(classnames), -1).to(self.device)

        self.sos_id = tokenizer("X")[0][0].item()
        self.eos_id = 49407 if clip_model.token_embedding.num_embeddings >= 49408 else 0
        self.total_prompt_len = ctx_len + self.tokenized.shape[1] + 2

    def forward(self):
        B = len(self.classnames)
        ctx_half = self.ctx_len // 2

        # Split each class-specific context into prefix/suffix
        ctx_prefix = self.ctx[:, :ctx_half, :]  # [B, ctx_half, D]
        ctx_suffix = self.ctx[:, ctx_half:, :]  # [B, ctx_half, D]

        # Class token embeddings: [B, class_len, D]
        class_embeds = self.clip_model.token_embedding(self.tokenized)

        # Special tokens
        sos = self.clip_model.token_embedding.weight[self.sos_id].unsqueeze(0).unsqueeze(0).expand(B, 1, -1)
        eos = self.clip_model.token_embedding.weight[self.eos_id].unsqueeze(0).unsqueeze(0).expand(B, 1, -1)

        # Combine: [SOS] + ctx_prefix + [CLASS] + ctx_suffix + [EOS]
        prompt_embeds = torch.cat([sos, ctx_prefix, class_embeds, ctx_suffix, eos], dim=1)
        return prompt_embeds



prompt_learner = PromptLearner(
    classnames=classnames,
    ctx_len=8,
    tokenizer=tokenizer,
    clip_model=model
).to(device)

## Define Few Shot Limit and Split Train Test Data

In [None]:
# === Dataset with Manual Split ===
def split_guardrail_dataset(yes_dir, no_dir, shot_limit, seed=42):
    random.seed(seed)
    yes_imgs = [os.path.join(yes_dir, f) for f in os.listdir(yes_dir) if f.endswith(('.jpg', '.png'))]
    no_imgs = [os.path.join(no_dir, f) for f in os.listdir(no_dir) if f.endswith(('.jpg', '.png'))]
    yes_train = random.sample(yes_imgs, shot_limit)
    no_train = random.sample(no_imgs, shot_limit)
    yes_test = list(set(yes_imgs) - set(yes_train))
    no_test = list(set(no_imgs) - set(no_train))
    train_samples = [(p, 0) for p in yes_train] + [(p, 1) for p in no_train]
    test_samples = [(p, 0) for p in yes_test] + [(p, 1) for p in no_test]
    random.shuffle(train_samples)
    random.shuffle(test_samples)
    return train_samples, test_samples

class GuardrailDataset(Dataset):
    def __init__(self, samples, transform):
        self.samples = samples
        self.transform = transform
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        path, label = self.samples[idx]
        image = Image.open(path).convert("RGB")
        return self.transform(image), label


shot_limit = 8  # Try 4, 8, or 16
train_samples, test_samples = split_guardrail_dataset(yes_folder, no_folder, shot_limit)

train_dataset = GuardrailDataset(train_samples, transform=preprocess)
test_dataset = GuardrailDataset(test_samples, transform=preprocess)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)


## Training

In [None]:
# === Training ===
def encode_custom_prompt(prompts):
    x = model.transformer(prompts)
    x = x[torch.arange(x.shape[0]), -1]
    return x @ model.text_projection

def train(train_loader, prompt_learner, epochs=100):
    optimizer = torch.optim.AdamW([prompt_learner.ctx], lr=5e-4, weight_decay=0.01)
    model.eval()
    for epoch in range(epochs):
        total_loss = 0
        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
            images = images.to(device)
            labels = labels.to(device)
            with torch.no_grad():
                image_features = model.encode_image(images)
                image_features = image_features / image_features.norm(dim=-1, keepdim=True)
            prompts = prompt_learner()
            text_features = encode_custom_prompt(prompts)
            text_features = text_features / text_features.norm(dim=-1, keepdim=True)
            logits = image_features @ text_features.T
            loss = F.cross_entropy(logits, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1} Loss: {total_loss:.4f}")

train(train_loader, prompt_learner, epochs=50)

## Evaluation

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# === Evaluation ===
def classify_tensor_image(image_tensor):
    image_tensor = image_tensor.to(device).unsqueeze(0)
    with torch.no_grad():
        image_features = model.encode_image(image_tensor)
        image_features = image_features / image_features.norm(dim=-1, keepdim=True)
        prompts = prompt_learner()
        x = model.transformer(prompts)
        x = x[torch.arange(x.shape[0]), -1]
        text_features = x @ model.text_projection
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)
        similarity = (image_features @ text_features.T).squeeze(0)
        probs = similarity.softmax(dim=-1).cpu().numpy()
    model_label = "yes" if probs[0] > probs[1] else "no"
    return model_label, probs[0], probs[1]

def evaluate(test_dataset):
    results = []
    y_true = []
    y_pred = []

    loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
    for (image_tensor, label), (img_path, _) in zip(loader, test_dataset.samples):
        model_label, prob_yes, prob_no = classify_tensor_image(image_tensor.squeeze(0))
        #model_label, prob_yes, prob_no = classify_tensor_image(image.squeeze(0))
        gt = "yes" if label.item() == 0 else "no"
        pred_label = model_label
        image_id = os.path.splitext(os.path.basename(img_path))[0]

        results.append({
            "ID": image_id,
            "label": gt,
            "model label": pred_label,
            "prob. yes": prob_yes,
            "prob. no": prob_no,
            "correct": int(model_label == gt)
        })

        y_true.append(0 if gt == "yes" else 1)
        y_pred.append(0 if pred_label == "yes" else 1)

    df = pd.DataFrame(results)
    accuracy = df["correct"].sum() / len(df)
    summary_row = {col: "" for col in df.columns}
    summary_row["label"] = f"Accuracy: {accuracy:.4f}"
    df = pd.concat([df, pd.DataFrame([summary_row])], ignore_index=True)

    df["ID"] = pd.to_numeric(df["ID"], errors='coerce').astype('Int64')
    df["correct"] = pd.to_numeric(df["correct"], errors='coerce').astype('Int64')
    df["prob. yes"] = pd.to_numeric(df["prob. yes"], errors='coerce').astype(float)
    df["prob. no"] = pd.to_numeric(df["prob. no"], errors='coerce').astype(float)

    timestamp = datetime.now(ZoneInfo("America/New_York")).strftime("%Y-%m-%d_%H-%M")
    filename = 'C:/Users/alimanso/anaconda_projects/VLM/results_' + timestamp + '.xlsx'
    df.to_excel(filename, index=False)

    print(f"\n📊 Final Accuracy: {accuracy:.4f}")
    print(f"✅ Results saved to: {filename}")

    # Plot Confusion Matrix
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["yes", "no"])
    disp.plot(cmap=plt.cm.Blues, values_format='d')
    plt.title("Confusion Matrix")
    plt.show()

evaluate(test_dataset)