In [None]:
!unzip /content/normal_roof.zip

In [None]:
from transformers import CLIPProcessor, CLIPModel
from peft import LoraConfig, get_peft_model, TaskType

# Pretrained model name (you can pick another variant)
MODEL_NAME = "openai/clip-vit-base-patch32"

# Load the processor
processor = CLIPProcessor.from_pretrained(MODEL_NAME)

# 1) Load the base CLIP model
base_model = CLIPModel.from_pretrained(MODEL_NAME)

# LoRA configuration
lora_config = LoraConfig(
    r=16,                     # LoRA rank
    lora_alpha=32,            # LoRA alpha
    target_modules=["q_proj", "v_proj"],  # Consider specifying exact paths if needed, e.g.
    # "vision_model.encoder.layers.0.self_attn.q_proj", etc.
    # You could also include projections if you plan to adapt heads:
    # ["vision_model.visual_projection", "text_model.text_projection", ...]
    lora_dropout=0.05,
    bias="none",
    # For CLIP fine-tuning for contrastive features, FEATURE_EXTRACTION is fine.
    # If adding a classification head, consider SEQ_CLS, etc.
    task_type=TaskType.FEATURE_EXTRACTION
)

# 3) Apply LoRA to the model
lora_model = get_peft_model(base_model, lora_config)

# Print trainable parameter counts and ratio
lora_model.print_trainable_parameters()


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
from torchvision import transforms

class MaterialClassificationDataset(Dataset):
    def __init__(self, image_dir, class_names, processor, image_transforms=None):
        self.image_paths = []
        self.labels = []
        self.class_names = class_names
        self.class_to_idx = {name: i for i, name in enumerate(class_names)}
        self.idx_to_class = {i: name for i, name in enumerate(class_names)}
        self.processor = processor
        self.image_transforms = image_transforms

        for class_name in class_names:
            class_dir = os.path.join(image_dir, class_name)
            if not os.path.isdir(class_dir):
                print(f"Warning: directory {class_dir} does not exist.")
                continue
            for img_name in os.listdir(class_dir):
                if img_name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
                    self.image_paths.append(os.path.join(class_dir, img_name))
                    self.labels.append(self.class_to_idx[class_name])

        # Text prompts (for image–text contrastive models like CLIP)
        self.text_prompts = [f"a photo of {name}" for name in self.class_names]
        self.text_inputs = self.processor(
            text=self.text_prompts,
            return_tensors="pt",
            padding=True,
            truncation=True
        )

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]

        try:
            image = Image.open(img_path).convert("RGB")
        except Exception as e:
            print(f"Error: failed to read image {img_path}. {e}")
            return None

        if self.image_transforms:
            image = self.image_transforms(image)

        image_inputs = self.processor(images=image, return_tensors="pt")

        return {
            "pixel_values": image_inputs.pixel_values.squeeze(0),
            "input_ids": self.text_inputs.input_ids,
            "attention_mask": self.text_inputs.attention_mask,
            "label": torch.tensor(label, dtype=torch.long)
        }

# === Image size helper ===
def resolve_image_size(processor):
    size = processor.feature_extractor.size
    if isinstance(size, dict):
        # Use height/width if present (e.g., {"height": 224, "width": 224})
        h = size.get("height") or size.get("shortest_edge") or 224
        w = size.get("width") or size.get("shortest_edge") or 224
        return (h, w)
    elif isinstance(size, int):
        return (size, size)
    elif isinstance(size, (list, tuple)) and len(size) == 2:
        return tuple(size)
    else:
        return (224, 224)  # fallback

# Get the model’s preferred input size
image_size = resolve_image_size(processor)

# === Data augmentation / preprocessing ===
# train_transforms = transforms.Compose([
#     transforms.RandomResizedCrop(image_size),
#     transforms.RandomHorizontalFlip(),
#     transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
# ])
train_transforms = transforms.Compose([
    transforms.RandomResizedCrop(image_size),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),  # convert PIL image to Tensor in [0, 1]
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet mean
                         std=[0.229, 0.224, 0.225])   # ImageNet std
])
val_transforms = transforms.Compose([
    transforms.Resize(image_size),
    transforms.CenterCrop(image_size),
])

# === Example usage ===
class_names = ["flat", "gabled", "gambrel", "hipped", "mansard", "pyramidal", "saltbox", "skillion"]
train_image_dir = "/content/sorted_images"
val_image_dir = "/content/sorted_images"

train_dataset = MaterialClassificationDataset(train_image_dir, class_names, processor, image_transforms=train_transforms)
val_dataset = MaterialClassificationDataset(val_image_dir, class_names, processor, image_transforms=val_transforms)

def collate_fn(batch):
    # Drop None samples that failed to load
    batch = list(filter(lambda x: x is not None, batch))
    if not batch:
        return None
    return torch.utils.data.dataloader.default_collate(batch)

train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)


In [None]:
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
lora_model.to(device)

# Optimizer (PEFT ensures only LoRA parameters are updated)
optimizer = optim.AdamW(lora_model.parameters(), lr=1e-4)  # You may tune LR; 1e-4 or 5e-5 are common starting points
num_epochs = 20  # Adjust based on dataset size and convergence; with only dozens of images, more epochs may be needed (watch for overfitting)

# Assume train_dataloader and class_names are defined
# class_names = train_dataset.class_names
# text_inputs_global = processor(text=[f"a photo of {name}" for name in class_names],
#                               return_tensors="pt", padding=True, truncation=True).to(device)

for epoch in range(num_epochs):
    lora_model.train()
    total_train_loss = 0

    progress_bar = tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{num_epochs} [Training]")
    for batch in progress_bar:
        if batch is None:
            continue  # Skip empty batches (if collate_fn returns None)

        pixel_values = batch["pixel_values"].to(device)

        # Text inputs are usually identical for all samples in a batch (one prompt per class).
        # In our Dataset, text_inputs is (num_classes, seq_len), shared across items.
        # DataLoader stacks them to (batch_size, num_classes, seq_len); we only need one copy.
        # So we take the first element along the batch dimension.
        text_input_ids_all_classes = batch["input_ids"][0].to(device)       # (num_classes, seq_len)
        text_attention_mask_all_classes = batch["attention_mask"][0].to(device)  # (num_classes, seq_len)

        # Get image features
        image_features = lora_model.get_image_features(pixel_values=pixel_values)  # (batch_size, embed_dim)

        # Get text features (for all classes)
        text_features = lora_model.get_text_features(
            input_ids=text_input_ids_all_classes,
            attention_mask=text_attention_mask_all_classes
        )  # (num_classes, embed_dim)

        # Feature normalization (standard in CLIP)
        image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True)
        text_features = text_features / text_features.norm(p=2, dim=-1, keepdim=True)

        # Similarity logits
        # logit_scale is a learnable temperature parameter in CLIP
        logit_scale = lora_model.logit_scale.exp()
        logits_per_image = logit_scale * image_features @ text_features.t()  # (batch_size, num_classes)

        labels = batch["label"].to(device)  # (batch_size,)

        loss = F.cross_entropy(logits_per_image, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()
        progress_bar.set_postfix({"loss": loss.item()})

    avg_train_loss = total_train_loss / len(train_dataloader)
    print(f"Epoch {epoch+1} - Training Loss: {avg_train_loss:.4f}")

    # --- Validation (optional but strongly recommended) ---
    if 'val_dataloader' in locals() and val_dataloader is not None:
        lora_model.eval()
        total_val_loss = 0
        correct_predictions = 0
        total_samples = 0

        val_progress_bar = tqdm(val_dataloader, desc=f"Epoch {epoch+1}/{num_epochs} [Validation]")
        with torch.no_grad():
            for batch in val_progress_bar:
                if batch is None:
                    continue

                pixel_values = batch["pixel_values"].to(device)
                text_input_ids_all_classes = batch["input_ids"][0].to(device)
                text_attention_mask_all_classes = batch["attention_mask"][0].to(device)
                labels = batch["label"].to(device)

                image_features = lora_model.get_image_features(pixel_values=pixel_values)
                text_features = lora_model.get_text_features(
                    input_ids=text_input_ids_all_classes,
                    attention_mask=text_attention_mask_all_classes
                )

                image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True)
                text_features = text_features / text_features.norm(p=2, dim=-1, keepdim=True)

                logit_scale = lora_model.logit_scale.exp()
                logits_per_image = logit_scale * image_features @ text_features.t()

                loss = F.cross_entropy(logits_per_image, labels)
                total_val_loss += loss.item()

                preds = torch.argmax(logits_per_image, dim=1)
                correct_predictions += (preds == labels).sum().item()
                total_samples += labels.size(0)
                val_progress_bar.set_postfix({"val_loss": loss.item()})

        avg_val_loss = total_val_loss / len(val_dataloader)
        accuracy = correct_predictions / total_samples if total_samples > 0 else 0.0
        print(f"Epoch {epoch+1} - Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {accuracy:.4f}")

# Save LoRA weights
save_path = "my_lora_clip_material_classifier"
lora_model.save_pretrained(save_path)

# Also save the processor and class names for future loading
import json
with open(os.path.join(save_path, "class_names.json"), "w") as f:
    json.dump(class_names, f)  # assumes class_names is in scope
processor.save_pretrained(save_path)

print(f"LoRA model saved to {save_path}")


In [None]:
import os
import json
import torch
from peft import PeftModel
from PIL import Image
from transformers import CLIPModel, CLIPProcessor

# --- Load model and related assets ---
base_model_name = "openai/clip-vit-base-patch32"   # Base CLIP model
lora_adapter_path = "my_lora_clip_material_classifier"  # Path to LoRA adapter

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1) Load the base CLIP model
base_model_for_inference = CLIPModel.from_pretrained(base_model_name)

# 2) Load and attach the LoRA adapter to the base model
inference_model = PeftModel.from_pretrained(base_model_for_inference, lora_adapter_path)
inference_model.to(device)
inference_model.eval()

# 3) Load the processor (image & text pre/post-processing)
processor_for_inference = CLIPProcessor.from_pretrained(lora_adapter_path)

# 4) Load class names
with open(os.path.join(lora_adapter_path, "class_names.json"), "r") as f:
    class_names_for_inference = json.load(f)

# 5) Prepare text prompts for all classes and compute text features once
text_prompts_inference = [f"a photo of {name}" for name in class_names_for_inference]
text_inputs_inference = processor_for_inference(
    text=text_prompts_inference,
    return_tensors="pt",
    padding=True,
    truncation=True
).to(device)

with torch.no_grad():
    text_features_inference = inference_model.get_text_features(
        input_ids=text_inputs_inference.input_ids,
        attention_mask=text_inputs_inference.attention_mask
    )
    # L2-normalize features (standard CLIP practice)
    text_features_inference = text_features_inference / text_features_inference.norm(p=2, dim=-1, keepdim=True)


def classify_single_image(image_path, model, processor, text_features_all_classes, class_names_list):
    """
    Classify a single image using CLIP+LoRA by comparing image features to precomputed text features.

    Args:
        image_path (str): Path to the image file.
        model (nn.Module): CLIP model with LoRA adapter applied.
        processor (CLIPProcessor): Preprocessing utility.
        text_features_all_classes (Tensor): (num_classes, embed_dim) normalized features.
        class_names_list (List[str]): Class names aligned with text features.

    Returns:
        (predicted_class, confidence) if success, or (error_message, 0.0) if failed.
    """
    try:
        image = Image.open(image_path).convert("RGB")
    except Exception as e:
        return f"Failed to read image: {e}", 0.0

    # Preprocess image
    image_inputs = processor(images=image, return_tensors="pt").to(device)

    with torch.no_grad():
        # Encode image and normalize
        image_features = model.get_image_features(pixel_values=image_inputs.pixel_values)
        image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True)

        # Compute similarity logits and softmax to get class probabilities
        logit_scale = model.logit_scale.exp()
        logits = logit_scale * image_features @ text_features_all_classes.t()
        probabilities = logits.softmax(dim=-1).squeeze()  # (num_classes,)

        predicted_idx = torch.argmax(probabilities).item()
        predicted_class = class_names_list[predicted_idx]
        confidence = probabilities[predicted_idx].item()

    return predicted_class, confidence


# --- Inference example ---
test_image_path = "/content/17062472.png"
predicted_material, confidence = classify_single_image(
    test_image_path,
    inference_model,
    processor_for_inference,
    text_features_inference,
    class_names_for_inference
)
print(f"Image: {test_image_path}")
print(f"Predicted class: {predicted_material}, Confidence: {confidence:.4f}")
