In [4]:
import torch
import torch.nn as nn

class PromptLearner(nn.Module):
    def __init__(self, class_names: list, n_context: int, model):
        super().__init__()

        self.class_names = class_names
        self.number_of_classes = len(self.class_names)
        self.positive_tokens = n_context
        self.negative_tokens = n_context
        self.token_dimension = 768 #model.ln_final.weight.shape[0]

        print(self.token_dimension)

        self.normal_prompt = [ "object" ]
        self.abnormal_prompt = [ f"{cls} object" for cls in class_names ]

        self.normal_prompt_length = len(self.normal_prompt)
        self.abnormal_prompt_length = len(self.abnormal_prompt)

        print('initilizing contexts')

        positive_context_vectors = torch.empty(1, self.normal_prompt_length, self.positive_tokens, self.token_dimension)
        negative_context_vectors = torch.empty(1, self.abnormal_prompt_length, self.negative_tokens, self.token_dimension)

        nn.init.normal_(positive_context_vectors, std=0.02)
        nn.init.normal_(negative_context_vectors, std=0.02)
        placeholder = " ".join(["X"] * self.positive_tokens)
        self.compount_prompts_depth = 4
        self.compound_prompts_text = nn.ParameterList([nn.Parameter(torch.empty(4, self.token_dimension))
                                                       for _ in range(9)])
        
        for single_token in self.compound_prompts_text:
            print(single_token.shape)
            nn.init.normal_(single_token, std =0.02)

        positive_tokens = nn.Parameter(positive_context_vectors)
        negative_tokens = nn.Parameter(negative_context_vectors)

        normal_prompt = [ placeholder + " " + template for template in self.normal_prompt ]
        anomaly_prompt = [ placeholder + " " + template for template in self.abnormal_prompt]

        print(normal_prompt, anomaly_prompt)








In [7]:
prompt  = PromptLearner(['bent', 'cut', 'scratch'], 12, 0)
prompt

768
initilizing contexts
torch.Size([4, 768])
torch.Size([4, 768])
torch.Size([4, 768])
torch.Size([4, 768])
torch.Size([4, 768])
torch.Size([4, 768])
torch.Size([4, 768])
torch.Size([4, 768])
torch.Size([4, 768])
['X X X X X X X X X X X X object'] ['X X X X X X X X X X X X bent object', 'X X X X X X X X X X X X cut object', 'X X X X X X X X X X X X scratch object']


PromptLearner(
  (compound_prompts_text): ParameterList(
      (0): Parameter containing: [torch.float32 of size 4x768]
      (1): Parameter containing: [torch.float32 of size 4x768]
      (2): Parameter containing: [torch.float32 of size 4x768]
      (3): Parameter containing: [torch.float32 of size 4x768]
      (4): Parameter containing: [torch.float32 of size 4x768]
      (5): Parameter containing: [torch.float32 of size 4x768]
      (6): Parameter containing: [torch.float32 of size 4x768]
      (7): Parameter containing: [torch.float32 of size 4x768]
      (8): Parameter containing: [torch.float32 of size 4x768]
  )
)

In [1]:
!nvidia-smi

Thu Feb 27 13:39:58 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 570.86.15              Driver Version: 570.86.15      CUDA Version: 12.8     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA A40                     On  |   00000000:A3:00.0 Off |                    0 |
|  0%   29C    P8             21W /  300W |       4MiB /  46068MiB |      0%   E. Process |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [1]:
import torch
import torch.nn as nn

def create_task_prompt_attention_mask(num_tasks, num_patches):
    """
    Creates an attention mask to block preceding task prompts from 
    attending to subsequent task prompts.
    
    Args:
        num_tasks (int): Number of task prompts (t).
        num_patches (int): Number of image patches (N).
    
    Returns:
        mask (torch.Tensor): Attention mask of shape [t + N + 1, t + N + 1].
                            `False` means "block attention".
    """
    total_tokens = num_tasks + num_patches + 1  # +1 for [CLS]
    
    # Initialize mask as all True (no blocking)
    mask = torch.ones((total_tokens, total_tokens), dtype=torch.bool)
    
    # Block preceding tasks from attending to subsequent tasks (lower triangular)
    for i in range(num_tasks):
        for j in range(i + 1, num_tasks):
            mask[i, j] = False  # Prevents task i from attending to task j
    
    # Allow all prompts to attend to patches + [CLS]
    mask[:num_tasks, num_tasks:] = True
    
    # Allow patches + [CLS] to attend to everything (standard ViT behavior)
    mask[num_tasks:, :] = True
    
    return mask

# Example usage
num_tasks = 3  # Number of task prompts (e.g., p1, p2, p3)
num_patches = 196  # For a 14x14 patch grid (224x224 image)
mask = create_task_prompt_attention_mask(num_tasks, num_patches)

print("Attention mask shape:", mask.shape)
print(mask) 

Attention mask shape: torch.Size([200, 200])
tensor([[ True, False, False,  ...,  True,  True,  True],
        [ True,  True, False,  ...,  True,  True,  True],
        [ True,  True,  True,  ...,  True,  True,  True],
        ...,
        [ True,  True,  True,  ...,  True,  True,  True],
        [ True,  True,  True,  ...,  True,  True,  True],
        [ True,  True,  True,  ...,  True,  True,  True]])


In [12]:
import os
import shutil
import csv

# Base path setup
obj = "pipe_fryum"
base_dir = f"/fs/scratch/rb_bd_dlp_rng_dl01_cr_ICT_employees/students/nan1rng/test/data/visa_v2"
image_base = os.path.join(base_dir, f"{obj}/Data/Images/Anomaly")
mask_base = os.path.join(base_dir, f"{obj}/Data/Masks/Anomaly")
output_base = os.path.join(base_dir, f"{obj}/Data/Images/Anomaly")

# Path to the annotation file
annotation_path = os.path.join(base_dir, f"{obj}/image_anno.csv" ) # Replace with the actual path if different

# Create folders and copy files
with open(annotation_path, "r") as f:
    reader = csv.reader(f)
    next(reader)

    for row in reader:

        if len(row) < 3 or not row[2].strip():
            continue
        print(row)
        image_path, labels, mask_path = row
        labels = [label.strip() for label in labels.split(",")]

        for label in labels:
            img_target_dir = os.path.join(output_base, label)
            mask_target_dir = os.path.join(output_base, "Masks", label)
            os.makedirs(img_target_dir, exist_ok=True)
            os.makedirs(mask_target_dir, exist_ok=True)

            # Copy image
            image_filename = os.path.basename(image_path)
            shutil.copy2(os.path.join(base_dir, image_path), os.path.join(img_target_dir, image_filename))

            # Copy mask
            mask_filename = os.path.basename(mask_path)
            shutil.copy2(os.path.join(base_dir, mask_path), os.path.join(mask_target_dir, mask_filename))

print("✅ Grouping complete!")


['pipe_fryum/Data/Images/Anomaly/000.JPG', 'burnt', 'pipe_fryum/Data/Masks/Anomaly/000.png']
['pipe_fryum/Data/Images/Anomaly/001.JPG', 'burnt', 'pipe_fryum/Data/Masks/Anomaly/001.png']
['pipe_fryum/Data/Images/Anomaly/002.JPG', 'burnt', 'pipe_fryum/Data/Masks/Anomaly/002.png']
['pipe_fryum/Data/Images/Anomaly/003.JPG', 'burnt', 'pipe_fryum/Data/Masks/Anomaly/003.png']
['pipe_fryum/Data/Images/Anomaly/004.JPG', 'burnt', 'pipe_fryum/Data/Masks/Anomaly/004.png']
['pipe_fryum/Data/Images/Anomaly/005.JPG', 'burnt', 'pipe_fryum/Data/Masks/Anomaly/005.png']
['pipe_fryum/Data/Images/Anomaly/006.JPG', 'burnt', 'pipe_fryum/Data/Masks/Anomaly/006.png']
['pipe_fryum/Data/Images/Anomaly/007.JPG', 'burnt', 'pipe_fryum/Data/Masks/Anomaly/007.png']
['pipe_fryum/Data/Images/Anomaly/008.JPG', 'burnt', 'pipe_fryum/Data/Masks/Anomaly/008.png']
['pipe_fryum/Data/Images/Anomaly/009.JPG', 'burnt', 'pipe_fryum/Data/Masks/Anomaly/009.png']
['pipe_fryum/Data/Images/Anomaly/010.JPG', 'burnt', 'pipe_fryum/Data/M

In [1]:
import os
import shutil
import csv

# Paths
base_dir = "/fs/scratch/rb_bd_dlp_rng_dl01_cr_ICT_employees/students/nan1rng/test/data/visa_v2/capsules"
annotation_path = "annotations.csv"
binary_img_dir = os.path.join(base_dir, "Data/Binary/Images")
binary_mask_dir = os.path.join(base_dir, "Data/Binary/Masks")

# Create binary folders if they don't exist
os.makedirs(binary_img_dir, exist_ok=True)
os.makedirs(binary_mask_dir, exist_ok=True)

# Move images and masks
with open(annotation_path, "r") as f:
    reader = csv.reader(f)
    next(reader)  # Skip header

    for row in reader:
        if len(row) < 3 or not row[2].strip():
            continue  # Skip normal cases (no mask)

        image_path, _, mask_path = row

        full_image_path = os.path.join(base_dir, image_path)
        full_mask_path = os.path.join(base_dir, mask_path)

        # Move image
        shutil.move(full_image_path, os.path.join(binary_img_dir, os.path.basename(image_path)))

        # Move mask
        shutil.move(full_mask_path, os.path.join(binary_mask_dir, os.path.basename(mask_path)))

print("✅ All anomaly images and masks moved to Binary folder.")


FileNotFoundError: [Errno 2] No such file or directory: 'annotations.csv'

In [22]:
import os
import shutil

from collections import defaultdict
cls_name = "pipe_fryum"
img_root = f"/fs/scratch/rb_bd_dlp_rng_dl01_cr_ICT_employees/students/nan1rng/test/data/visa_multitiype/{cls_name}/Data/Anomaly"
mask_root = f"/fs/scratch/rb_bd_dlp_rng_dl01_cr_ICT_employees/students/nan1rng/test/data/visa_multitiype/{cls_name}/Data/Anomaly/Masks"

output_root = f"/fs/scratch/rb_bd_dlp_rng_dl01_cr_ICT_employees/students/nan1rng/test/data/visa_multitiype/{cls_name}/Data/Anomaly_grouped"
os.makedirs(output_root, exist_ok=True)

image_defects = defaultdict(list)
for defect_type in os.listdir(img_root):
    if defect_type == "Masks":
        continue
    defect_path = os.path.join(img_root, defect_type)
    if not os.path.isdir(defect_path):
        continue
    for img_name in os.listdir(defect_path):
        if not img_name.lower().endswith(('.jpg', '.png', '.jpeg')):
            continue
        base_name = os.path.splitext(img_name)[0]
        image_folder = os.path.join(output_root, base_name)
        os.makedirs(os.path.join(image_folder, 'masks'), exist_ok=True)

        dest_img_path = os.path.join(image_folder, "image.jpg")
        if not os.path.exists(dest_img_path):
            shutil.copy(os.path.join(defect_path, img_name), dest_img_path)
        mask_path = os.path.join(mask_root, defect_type, base_name + ".png")
        if os.path.exists(mask_path):
            dst_mask_path = os.path.join(image_folder, "masks", defect_type.replace(" ", "_") + ".png")
            shutil.copy(mask_path, dst_mask_path)

        image_defects[base_name].append(defect_type)

# for img_name, defects in image_defects.items():
#     defect_file = os.path.join(output_root, img_name, "defects.txt")
#     with open(defect_file, "w") as f:
#         f.write("\n".join(defects))

In [7]:
import os

object = "pcb4"
mask_root = f"/fs/scratch/rb_bd_dlp_rng_dl01_cr_ICT_employees/students/nan1rng/test/data/visa_multitiype/{object}/Data/Anomaly_grouped"
file_names = []
for img in os.listdir(mask_root):
    img_path = os.path.join(mask_root, img)
    mask_path = os.path.join(img_path, "masks")
    if len(os.listdir(mask_path)) > 1:
        file_names.append(img)

print((sorted(file_names)))


['000', '001', '002', '003', '004', '007', '009', '010', '014', '016', '017', '019', '022', '029', '031', '033', '034', '035', '036', '039', '040', '042', '043', '045', '047', '050', '054', '056', '058', '060', '064', '070', '071', '074', '075', '078', '079', '080', '081', '082', '084', '085', '086', '087', '088', '092', '095', '097', '098']


In [None]:
import cv2
import numpy as np
import os
from PIL import Image
object = "fryu"
img_no = "008"
mask_root = f"/fs/scratch/rb_bd_dlp_rng_dl01_cr_ICT_employees/students/nan1rng/test/data/visa_multitiype/{object}/Data/Anomaly_grouped/{img_no}/masks"
realtive_opath = "data/visa_multitiype/chewinggum/Data/Anomaly_grouped/008/masks/missing.png"
# Load a binary image (0 and 1) – if needed, multiply by 255
img_mask = cv2.imread(os.path.join(mask_root, "missing.png"), cv2.IMREAD_GRAYSCALE)
img_np = np.array(img_mask).astype(np.uint8)
#img = (np.ones((400, 400), dtype=np.uint8)) * 255  # Example 1s image
 
# Mouse callback to draw
def draw_mask(event, x, y, flags, param):
    if event == cv2.EVENT_LBUTTONDOWN:
        cv2.circle(img_np, (x, y), 20, 0, -1)  # Draw a black (0) circle
 
cv2.namedWindow('Mask')
cv2.setMouseCallback('Mask', draw_mask)
 
while True:
    cv2.imshow('Mask', img_np)
    key = cv2.waitKey(1)
    if key == 27:  # ESC to exit
        break
 
cv2.destroyAllWindows()
 
# Convert to binary 0/1 again if needed
cv2.imwrite(os.path.join(mask_root, "crack.png"), img_np)
#img = img // 255

In [5]:
import os
import json
from sklearn.model_selection import train_test_split
object = "candle"

base_dir = f"/fs/scratch/rb_bd_dlp_rng_dl01_cr_ICT_employees/students/nan1rng/test/data/visa_multitiype/{object}/Data"
relative_path = f"{object}/Data"
normal_dir = os.path.join(base_dir, "Normal")
anomaly_dir = os.path.join(base_dir, "Anomaly_grouped")

# Output dictionary
dataset_json = {
    "train": [],
    "test": []
}

# Gather all normal images
normal_images = [
    os.path.join(normal_dir, fname)
    for fname in os.listdir(normal_dir)
    if fname.lower().endswith((".jpg", ".png"))
]

# Split normal images into train (80%) and test (20%)
train_normals, test_normals = train_test_split(normal_images, test_size=0.2, random_state=42)

# Add to train
for img_path in train_normals:
    rel_path = os.path.relpath(img_path, base_dir)
    dataset_json["train"].append({
        "image_path": os.path.join(relative_path, rel_path),
        "defects": {}
    })

# Add 20% normal to test
for img_path in test_normals:
    dataset_json["test"].append({
        "image_path": os.path.join(relative_path, rel_path),
        "defects": {}
    })

# Add all anomaly samples to test
for sample_id in os.listdir(anomaly_dir):
    sample_path = os.path.join(anomaly_dir, sample_id)
    image_path = os.path.join(sample_path, "image.jpg")
    masks_dir = os.path.join(sample_path, "masks")

    # Skip if image or mask folder is missing
    if not os.path.exists(image_path):
        continue

    defects = {}
    if os.path.isdir(masks_dir):
        for mask_file in os.listdir(masks_dir):
            if mask_file.lower().endswith(".png"):
                defect_name = os.path.splitext(mask_file)[0]  # e.g., "missing"
                mask_path = os.path.join(masks_dir, mask_file)
                rel_mask_path = os.path.join(relative_path, os.path.relpath(mask_path, base_dir))
                defects[defect_name] = rel_mask_path
    rel_image_path = os.path.join(relative_path, os.path.relpath(image_path, base_dir))

    dataset_json["test"].append({
        "image_path": rel_image_path,
        "defects": defects
    })

# Save the final JSON
output_path = "visa_candle_dataset.json"
with open(output_path, "w") as f:
    json.dump(dataset_json, f, indent=2)

print(f"JSON saved to {output_path}")

JSON saved to visa_candle_dataset.json


In [9]:
import os
import json
from sklearn.model_selection import train_test_split

# ---- CONFIGURATION ----
root_dir = "/fs/scratch/rb_bd_dlp_rng_dl01_cr_ICT_employees/students/nan1rng/test/data/visa_multitiype"  # Top-level directory containing class folders (e.g., candle, capsule)
json_relative_root = "visa_multitiype"  # Prefix to preserve in saved paths
output_json_path = "visa_dataset_split_by_phase.json"
# ------------------------
strip_prefix = os.path.abspath("data/visa_multitiype")
# Final JSON structure
dataset_json = {
    "train": {},
    "test": {}
}

# Iterate over each class directory
for cls_name in os.listdir(root_dir):
    cls_path = os.path.join(root_dir, cls_name)
    if not os.path.isdir(cls_path):
        continue

    data_path = os.path.join(cls_path, "Data")
    normal_dir = os.path.join(data_path, "Normal")
    anomaly_dir = os.path.join(data_path, "Anomaly_grouped")

    train_entries = []
    test_entries = []

    # --- Normal Images ---
    if os.path.isdir(normal_dir):
        normal_images = [
            os.path.join(normal_dir, fname)
            for fname in os.listdir(normal_dir)
            if fname.lower().endswith((".jpg", ".jpeg", ".png"))
        ]

        # Split: 80% train, 20% test
        train_normals, test_normals = train_test_split(normal_images, test_size=0.2, random_state=42)

        for img_path in train_normals:
            rel_path = os.path.relpath(img_path, strip_prefix).replace("\\", "/")
            train_entries.append({
                "image_path": rel_path,
                "defects": {},
                "Anomaly": 0
            })

        for img_path in test_normals:
            rel_path = os.path.relpath(img_path, strip_prefix).replace("\\", "/")
            test_entries.append({
                "image_path": rel_path,
                "defects": {},
                "Anomaly": 0
            })

    # --- Anomalous Images ---
    if os.path.isdir(anomaly_dir):
        for sample_id in os.listdir(anomaly_dir):
            sample_path = os.path.join(anomaly_dir, sample_id)
            image_path = os.path.join(sample_path, "image.jpg")
            masks_dir = os.path.join(sample_path, "masks")

            if not os.path.exists(image_path):
                continue

            defects = {}
            if os.path.isdir(masks_dir):
                for mask_file in os.listdir(masks_dir):
                    if mask_file.lower().endswith(".png"):
                        defect_name = os.path.splitext(mask_file)[0]
                        mask_path = os.path.join(masks_dir, mask_file)
                        rel_mask_path = os.path.relpath(mask_path, strip_prefix).replace("\\", "/")
                        defects[defect_name] = rel_mask_path

            rel_image_path = os.path.relpath(image_path, strip_prefix).replace("\\", "/")
            test_entries.append({
                "image_path": rel_image_path,
                "defects": defects,
                "Anomaly": 1
            })

    # Store in global dictionary
    if train_entries:
        dataset_json["train"][cls_name] = train_entries
    if test_entries:
        dataset_json["test"][cls_name] = test_entries

# Save to JSON file
with open(output_json_path, "w") as f:
    json.dump(dataset_json, f, indent=2)

print(f"Saved JSON to {output_json_path}")


Saved JSON to visa_dataset_split_by_phase.json
