In [1]:
!pip install torch pillow



In [2]:
import json
from PIL import Image
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import clip
from torchvision.datasets import ImageFolder
from torchvision import transforms

# Path to your dataset
dataset_path = 'garbage classification/Garbage classification'

# Load the CLIP model and its preprocessing tools
device = "cuda:0" if torch.cuda.is_available() else "cpu" 
model, preprocess = clip.load("ViT-B/32", device=device, jit=False)

# Custom dataset using ImageFolder
dataset = ImageFolder(root=dataset_path, transform=preprocess)

# DataLoader
train_dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Function to convert model's parameters to FP32 format
def convert_models_to_fp32(model): 
    for p in model.parameters(): 
        p.data = p.data.float() 
        p.grad.data = p.grad.data.float() 

# Prepare the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=5e-6, betas=(0.9, 0.98), eps=1e-6, weight_decay=0.2)

# Specify the loss function
loss_fn = nn.CrossEntropyLoss()

# Train the model
num_epochs = 10
for epoch in range(num_epochs):
    pbar = tqdm(train_dataloader, total=len(train_dataloader))
    for images, labels in pbar:
        optimizer.zero_grad()

        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        image_features = model.encode_image(images)
        logits_per_image = image_features @ model.encode_text(clip.tokenize(["glass", "cardboard", "metal", "paper", "plastic"] * (labels.shape[0] // 5 + 1)).to(device)[:labels.shape[0]].T)

        # Compute loss
        loss = loss_fn(logits_per_image, labels)

        # Backward pass
        loss.backward()
        optimizer.step()

        pbar.set_description(f"Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}")

        if device != "cpu":
            convert_models_to_fp32(model)

# Note: Adjust the tokenization part to your specific needs, it assumes each batch is evenly divisible by number of classes which may not be the case.


  0%|          | 0/79 [00:00<?, ?it/s]


RuntimeError: The size of tensor a (32) must match the size of tensor b (77) at non-singleton dimension 1

In [4]:
import os
from PIL import Image

from tqdm import tqdm

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

import clip

# Path to your dataset
dataset_path = 'garbage classification/Garbage classification'

# Choose computation device
device = "cuda:0" if torch.cuda.is_available() else "cpu"

# Load pre-trained CLIP model
model, preprocess = clip.load("ViT-B/32", device=device, jit=False)

# Prepare image paths and labels
list_image_path = []
list_txt = []
for class_name in os.listdir(dataset_path):
    class_dir = os.path.join(dataset_path, class_name)
    if os.path.isdir(class_dir):
        for img_filename in os.listdir(class_dir):
            img_path = os.path.join(class_dir, img_filename)
            list_image_path.append(img_path)
            list_txt.append(class_name)  # Use the folder name as the label

# Define a custom dataset
class image_title_dataset():
    def __init__(self, list_image_path, list_txt):
        # Initialize image paths and corresponding texts
        self.image_path = list_image_path
        # Tokenize text using CLIP's tokenizer
        self.title = clip.tokenize(list_txt)

    def __len__(self):
        return len(self.title)

    def __getitem__(self, idx):
        # Preprocess image using CLIP's preprocessing function
        image = preprocess(Image.open(self.image_path[idx]))
        title = self.title[idx]
        return image, title

# Create the dataset and DataLoader
dataset = image_title_dataset(list_image_path, list_txt)
train_dataloader = DataLoader(dataset, batch_size=32, shuffle=True)  # Adjust batch size as needed

# Function to convert model's parameters to FP32 format
def convert_models_to_fp32(model): 
    for p in model.parameters(): 
        p.data = p.data.float() 
        p.grad.data = p.grad.data.float() 


if device == "cpu":
  model.float()

# Prepare the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5,betas=(0.9,0.98),eps=1e-6,weight_decay=0.2) # the lr is smaller, more safe for fine tuning to new dataset


# Specify the loss function
loss_img = nn.CrossEntropyLoss()
loss_txt = nn.CrossEntropyLoss()

# Train the model
num_epochs = 30
for epoch in range(num_epochs):
    pbar = tqdm(train_dataloader, total=len(train_dataloader))
    for batch in pbar:
        optimizer.zero_grad()

        images,texts = batch 
        
        images= images.to(device)
        texts = texts.to(device)

        # Forward pass
        logits_per_image, logits_per_text = model(images, texts)

        # Compute loss
        ground_truth = torch.arange(len(images),dtype=torch.long,device=device)
        total_loss = (loss_img(logits_per_image,ground_truth) + loss_txt(logits_per_text,ground_truth))/2

        # Backward pass
        total_loss.backward()
        if device == "cpu":
            optimizer.step()
        else : 
            convert_models_to_fp32(model)
            optimizer.step()
            clip.model.convert_weights(model)

        pbar.set_description(f"Epoch {epoch}/{num_epochs}, Loss: {total_loss.item():.4f}")

Epoch 0/30, Loss: 2.6543: 100%|██████████| 79/79 [00:21<00:00,  3.72it/s]
Epoch 1/30, Loss: 2.2734: 100%|██████████| 79/79 [00:20<00:00,  3.81it/s]
Epoch 2/30, Loss: 3.1055: 100%|██████████| 79/79 [00:20<00:00,  3.81it/s]
Epoch 3/30, Loss: 2.5547: 100%|██████████| 79/79 [00:20<00:00,  3.83it/s]
Epoch 4/30, Loss: 2.7461: 100%|██████████| 79/79 [00:20<00:00,  3.92it/s]
Epoch 5/30, Loss: 2.7539: 100%|██████████| 79/79 [00:20<00:00,  3.85it/s]
Epoch 6/30, Loss: 2.6719: 100%|██████████| 79/79 [00:20<00:00,  3.79it/s]
Epoch 7/30, Loss: 2.9922: 100%|██████████| 79/79 [00:20<00:00,  3.92it/s]
Epoch 8/30, Loss: 2.6992: 100%|██████████| 79/79 [00:20<00:00,  3.80it/s]
Epoch 9/30, Loss: 2.6855: 100%|██████████| 79/79 [00:20<00:00,  3.81it/s]
Epoch 10/30, Loss: 2.7656: 100%|██████████| 79/79 [00:21<00:00,  3.75it/s]
Epoch 11/30, Loss: 2.4688: 100%|██████████| 79/79 [00:20<00:00,  3.78it/s]
Epoch 12/30, Loss: 3.1699: 100%|██████████| 79/79 [00:20<00:00,  3.82it/s]
Epoch 13/30, Loss: 2.8047: 100%|███

In [None]:
import torch
import clip
from PIL import Image

# Load pre-trained CLIP model (make sure it includes your custom trained weights if applicable)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device, jit=False)

# Ensure your model is in evaluation mode
model.eval()

def prepare_image(image_path):
    # Load image
    image = Image.open(image_path).convert("RGB")
    # Preprocess the image
    image_preprocessed = preprocess(image).unsqueeze(0)  # Add batch dimension
    return image_preprocessed

def predict_image_class(image_path, model, device):
    # Prepare the image
    image_tensor = prepare_image(image_path).to(device)

    # Define class names (ensure these are the same as used during training)
    class_names = ["This is a picture of glass garbage", "This is a picture of cardboard garbage", "This is a picture of metal garbage", "This is a picture of paper garbage", "This is a picture of plastic garbage"]
    text_tokens = clip.tokenize(class_names).to(device)

    # Generate image and text features
    with torch.no_grad():
        image_features = model.encode_image(image_tensor)
        text_features = model.encode_text(text_tokens)

    # Calculate the similarity (dot product) between image features and text features
    logits = image_features @ text_features.T
    probabilities = logits.softmax(dim=1).cpu().numpy()

    # Get the top prediction
    top_class_index = probabilities.argmax()
    return class_names[top_class_index], probabilities[0, top_class_index]


image_path = '/home/mingwei/Desktop/CLIP/garbage classification/Garbage classification/glass/glass1.jpg'
predicted_class, confidence = predict_image_class(image_path, model, device)
print(f"Predicted class: {predicted_class} with confidence {confidence:.4f}")


In [None]:
# Prepare image paths and labels for the test set
test_dataset_path = 'garbage classification/Garbage classification'
list_test_image_path = []
list_test_txt = []
for class_name in os.listdir(test_dataset_path):
    class_dir = os.path.join(test_dataset_path, class_name)
    if os.path.isdir(class_dir):
        for img_filename in os.listdir(class_dir):
            img_path = os.path.join(class_dir, img_filename)
            list_test_image_path.append(img_path)
            list_test_txt.append(class_name)  # Use the folder name as the label

# Create the test dataset
test_dataset = image_title_dataset(list_test_image_path, list_test_txt)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)  # Use non-shuffling loader for testing

def calculate_accuracy(model, dataloader, device):
    model.eval()  # Set model to evaluation mode
    correct = 0
    total = 0

    with torch.no_grad():  # No need to track gradients for testing
        for images, titles in dataloader:
            images = images.to(device)
            titles = titles.to(device)

            # Forward pass
            logits_per_image, _ = model(images, titles)
            
            # Get predictions from the maximum value
            predicted = logits_per_image.argmax(dim=1)
            
            # Compare with ground truth
            ground_truth = torch.arange(len(images), dtype=torch.long, device=device)
            correct += (predicted == ground_truth).sum().item()
            total += images.size(0)

    accuracy = 100 * correct / total
    return accuracy


# Calculate the accuracy on the test dataset
test_accuracy = calculate_accuracy(model, test_dataloader, device)
print(f"Test Accuracy: {test_accuracy:.2f}%")

