# Importing Necessary Libraries

In [1]:
import random
import copy
import json, os
from PIL import Image
from tqdm.autonotebook import tqdm, trange

import clip
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import DataLoader
from torchvision import transforms
from datasets import Dataset, load_dataset, load_from_disk, concatenate_datasets
from sklearn.metrics import classification_report, top_k_accuracy_score

from transformers import pipeline
from transformers import CLIPProcessor, CLIPModel, CLIPImageProcessor, CLIPTokenizer
from prompt import le_prompt, prompt, negative_prompt, description

import warnings
warnings.filterwarnings("ignore")

random.seed(24)

  from tqdm.autonotebook import tqdm, trange


# Generate Language Detailed Prompts

In [None]:
messages = [
    {"role": "system", "content": "You are a professional botanist with speciality in describing and picturising rare plants in great detail"},
    {"role": "user", "content": """
    Rephrase the following description of {} plant. Remove the camera details and 
    maintain the features of the plant. Add more detailed features if required to 
    describe the flower. Just provide the rephrased description of the plant in 1 
    paragraph.

    Description:
    {}"""},
]

pipe = pipeline("text-generation", model="TinyLlama/TinyLlama-1.1B-Chat-v1.0", torch_dtype=torch.bfloat16, device_map="auto")

tiny_prompts = []

for flower in prompt:
    for i in range(320):
        tiny_prompt = copy.deepcopy(messages)
        tiny_prompt[1]['content'] = tiny_prompt[1]['content'].format(flower, prompt[flower])
        tiny_prompts.append(tiny_prompt)

outputs = pipe(tiny_prompts, max_new_tokens=256, do_sample=True, temperature=0.7, top_k=50, top_p=0.95)

# Generate the label mapping to indexes and descriptions

In [2]:
# Label Maps
l2i = {}
i2l = []

# Descriptions
desc = []

for i, plant in enumerate(prompt):
    l2i[plant] = i
    i2l.append(plant)
    desc.append(description[plant])

print(i2l)

['Rufflesia-Arnoldii', 'Encephalartos-Woodii', 'Amorphophallus-Titanum', 'Ghost-Orchid', 'Dracaena-Cinnabari']


# Load the dataset

In [3]:
ds = load_dataset("imagefolder", data_dir="./data")
syn = load_dataset("imagefolder", data_dir="./data/synthetic/")

Resolving data files:   0%|          | 0/25 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/250 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/1600 [00:00<?, ?it/s]

# Apply processing over the data

In [4]:
image_size = (224, 224)

ds = ds.map(lambda batch: {'pixel_values': transforms.ToTensor()(batch['image'].resize(image_size))}, batch_size=32)
syn = syn.map(lambda batch: {'pixel_values': transforms.ToTensor()(batch['image'].resize(image_size))}, batch_size=32)

ds = ds.remove_columns(['image'])
syn = syn.remove_columns(['image'])

ds = ds.align_labels_with_mapping(l2i, "label")
syn = syn.align_labels_with_mapping(l2i, "label")

ds = ds.with_format('torch')
syn = syn.with_format('torch')

# Load the CLIP model and utilities

In [5]:
model_name = 'openai/clip-vit-large-patch14'

# model = CLIPModel.from_pretrained(model_name, device_map='auto')
processor = CLIPProcessor.from_pretrained(model_name)

# Utility for evaluating the model

In [6]:
def eval(eval_model, cls):
    eval_model.eval()
    
    test_labels = []
    test_probs = []
    
    test_loader = torch.utils.data.DataLoader(ds['test'], batch_size=32)
    for batch in tqdm(test_loader, total=len(test_loader)):
        with torch.no_grad():
            inputs = processor(text=cls, images=batch['pixel_values'], return_tensors="pt", padding=True)
            inputs = {k: v.to('cuda') for k, v in inputs.items()}
            
            outputs = eval_model(**inputs)
            logits_per_image = outputs.logits_per_image
            probs = logits_per_image.softmax(dim=1)
        
            del inputs
            torch.cuda.empty_cache()
        
            test_labels.append(batch['label'].cpu())
            test_probs.append(probs.cpu())
    
    test_labels = torch.cat(test_labels)
    test_probs = torch.cat(test_probs)

    return test_labels, test_probs

## Evaluate accuray with name only labels

In [7]:
y_true, y_pred = eval(model, [f'This is image of the {plant} plant' for plant in i2l])
print(classification_report(y_true, y_pred.argmax(dim=1, keepdim=True)))

  0%|          | 0/8 [00:00<?, ?it/s]

It looks like you are trying to rescale already rescaled images. If the input images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again.


              precision    recall  f1-score   support

           0       0.18      0.54      0.27        50
           1       0.27      0.08      0.12        50
           2       0.27      0.32      0.29        50
           3       0.58      0.28      0.38        50
           4       0.00      0.00      0.00        50

    accuracy                           0.24       250
   macro avg       0.26      0.24      0.21       250
weighted avg       0.26      0.24      0.21       250



## Evaluate accuray with detailed labels

In [8]:
y_true, y_pred = eval(model, desc)
print(classification_report(y_true, y_pred.argmax(dim=1, keepdim=True)))

  0%|          | 0/8 [00:00<?, ?it/s]

              precision    recall  f1-score   support

           0       0.20      1.00      0.34        50
           1       0.00      0.00      0.00        50
           2       0.00      0.00      0.00        50
           3       1.00      0.04      0.08        50
           4       0.00      0.00      0.00        50

    accuracy                           0.21       250
   macro avg       0.24      0.21      0.08       250
weighted avg       0.24      0.21      0.08       250



# Train CLIP Model with synthetic data

In [7]:
def compute_loss_and_accuracy(outputs, labels):
    logits = outputs.logits_per_image  # Shape: (batch_size, num_labels)
    labels = labels.to(logits.device)
    
    # Compute the cross-entropy loss
    loss = F.cross_entropy(logits, labels)
    
    # Compute the accuracy
    _, predicted_labels = torch.max(logits, dim=1)
    accuracy = torch.sum(predicted_labels == labels) / labels.size(0)
    
    return loss, accuracy

In [8]:
def train_clip_model(
    model, 
    processor, 
    train_dataset, 
    prompts, 
    num_epochs=10, 
    batch_size=32, 
    learning_rate=5e-6, 
    device='cuda', 
    warmup_steps=100, 
    accumulation_steps=4, 
    max_grad_norm=1.0
):
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.01)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)
    model.to(device)

    progress_bar = tqdm(total=num_epochs * len(train_dataloader), unit='batch')
    global_step = 0
    
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0.0
        epoch_accuracy = 0.0
        num_batch = len(train_dataloader)

        for step, batch in enumerate(train_dataloader):
            if isinstance(prompts, dict):
                cls = [random.sample(prompts[plant], k=1)[0] for plant in i2l]
            else:
                cls = prompts
            
            inputs = processor(
                images=batch["pixel_values"], 
                text=cls, 
                return_tensors="pt", 
                padding=True,
                max_length=77,
                truncation=True,
            )
            inputs = {k: v.to(device) for k, v in inputs.items()}

            outputs = model(**inputs)
            loss, accuracy = compute_loss_and_accuracy(outputs, batch['label'])

            loss = loss / accumulation_steps
            loss.backward()

            if (step + 1) % accumulation_steps == 0:
                # Gradient Clipping
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=max_grad_norm)
                
                optimizer.step()
                optimizer.zero_grad()

            epoch_loss += loss.item()
            epoch_accuracy += accuracy.item()

            progress_bar.set_postfix(epoch=(epoch+1)+(step/num_batch), loss=loss.item(), accuracy=accuracy.item())
            progress_bar.update(1)

            global_step += 1

            # Warmup Steps
            if global_step < warmup_steps:
                warmup_percent = global_step / warmup_steps
                curr_lr = learning_rate * warmup_percent
                optimizer.param_groups[0]['lr'] = curr_lr

        scheduler.step()

        epoch_loss /= len(train_dataloader)
        epoch_accuracy /= len(train_dataloader)

        # Print epoch summary
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

    progress_bar.close()
    return model

def freeze_layers(model, num_frozen_layers):
    for i, param in enumerate(model.parameters()):
        if i < num_frozen_layers:
            param.requires_grad = False
        else:
            param.requires_grad = True

### Freeze Layers for stable learning convergence

In [39]:
# Load the pre-trained CLIP model
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")

# Freeze the first few layers of the model
num_frozen_layers = 100
freeze_layers(model, num_frozen_layers)

In [12]:
num_epochs = 5
batch_size = 8
learning_rate = 1e-5
device = 'cuda'

# Train the model
gen_prompts = json.load(open('./generated_text.json', 'r'))
fine_tuned_model = train_clip_model(model, processor, syn['train'], gen_prompts, num_epochs, batch_size, learning_rate, device)

# Save the fine-tuned model
fine_tuned_model.save_pretrained("./models/zsl-desc-tuned")

  0%|          | 0/1000 [00:00<?, ?batch/s]

Epoch 1/5 - Loss: 0.3641, Accuracy: 0.4950
Epoch 2/5 - Loss: 0.0536, Accuracy: 0.9231
Epoch 3/5 - Loss: 0.0361, Accuracy: 0.9513
Epoch 4/5 - Loss: 0.0265, Accuracy: 0.9688
Epoch 5/5 - Loss: 0.0025, Accuracy: 0.9988


## Evaluate accuray with name only labels

In [16]:
y_true, y_pred = eval(fine_tuned_model, i2l)
print(classification_report(y_true, y_pred.argmax(dim=1, keepdim=True)))

  0%|          | 0/8 [00:00<?, ?it/s]

              precision    recall  f1-score   support

           0       0.89      0.50      0.64        50
           1       1.00      0.16      0.28        50
           2       0.35      0.24      0.29        50
           3       0.34      0.98      0.51        50
           4       0.83      0.60      0.70        50

    accuracy                           0.50       250
   macro avg       0.68      0.50      0.48       250
weighted avg       0.68      0.50      0.48       250



## Evaluate accuray with detailed description of labels

In [15]:
y_true, y_pred = eval(fine_tuned_model, desc)
print(classification_report(y_true, y_pred.argmax(dim=1, keepdim=True)))

  0%|          | 0/8 [00:00<?, ?it/s]

              precision    recall  f1-score   support

           0       0.91      0.58      0.71        50
           1       0.85      0.34      0.49        50
           2       0.50      0.24      0.32        50
           3       0.39      0.98      0.56        50
           4       0.73      0.72      0.73        50

    accuracy                           0.57       250
   macro avg       0.68      0.57      0.56       250
weighted avg       0.68      0.57      0.56       250



In [17]:
model = CLIPModel.from_pretrained("./models/zsl-desc-tuned/")

# Few shot Learning with CLIP

In [46]:
def minmax_scaler(tensor):
    b, n, h, w = tensor.size()
    min_val = tensor.view(b, n, -1).min(dim=-1, keepdims=True).values.view([b, n, 1, 1])
    max_val = tensor.view(b, n, -1).max(dim=-1, keepdims=True).values.view([b, n, 1, 1])
    scaled_tensor = (tensor - min_val) / (max_val - min_val + 1e-6)
    return scaled_tensor

transform = transforms.Compose([
    transforms.RandomResizedCrop(size=224),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomGrayscale(p=0.1),
    transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5)),
    transforms.Lambda(minmax_scaler)  # Apply channel-wise min-max scaling
])

In [47]:
def transform_sample(sample):
    pixel_values = sample["pixel_values"]
    transformed_pixel_values = transform(pixel_values)
    sample["pixel_values"] = transformed_pixel_values
    return sample

def create_extended_dataset(dataset, num_times=2):
    extended_datasets = []
    for _ in range(num_times - 1):
        transformed_dataset = dataset.map(transform_sample, batched=True, batch_size=4)
        extended_datasets.append(transformed_dataset)  
    extended_datasets.append(dataset)
    
    extended_dataset = concatenate_datasets(extended_datasets)
    return extended_dataset

## Real Image Only Learning

In [48]:
num_times = 10
train_dataset = create_extended_dataset(ds["train"], num_times)

Map:   0%|          | 0/25 [00:00<?, ? examples/s]

In [49]:
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32", device_map='auto')

num_frozen_layers = 100
freeze_layers(model, num_frozen_layers)

num_epochs = 5
batch_size = 8
learning_rate = 1e-5
device = 'cuda'

gen_prompts = json.load(open('./generated_text.json', 'r'))
fine_tuned_model = train_clip_model(model, processor, train_dataset, gen_prompts, num_epochs, batch_size, learning_rate, device)

fine_tuned_model.save_pretrained("./models/real-desc-tuned")

  0%|          | 0/160 [00:00<?, ?batch/s]

It looks like you are trying to rescale already rescaled images. If the input images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again.


Epoch 1/5 - Loss: 0.7910, Accuracy: 0.1602
Epoch 2/5 - Loss: 0.4883, Accuracy: 0.2539
Epoch 3/5 - Loss: 0.4315, Accuracy: 0.2188
Epoch 4/5 - Loss: 0.4111, Accuracy: 0.3359
Epoch 5/5 - Loss: 0.3156, Accuracy: 0.5078


In [50]:
y_true, y_pred = eval(fine_tuned_model, i2l)
print(classification_report(y_true, y_pred.argmax(dim=1, keepdim=True)))

y_true, y_pred = eval(fine_tuned_model, desc)
print(classification_report(y_true, y_pred.argmax(dim=1, keepdim=True)))

  0%|          | 0/8 [00:00<?, ?it/s]

              precision    recall  f1-score   support

           0       0.20      0.98      0.33        50
           1       0.00      0.00      0.00        50
           2       0.00      0.00      0.00        50
           3       0.00      0.00      0.00        50
           4       0.00      0.00      0.00        50

    accuracy                           0.20       250
   macro avg       0.04      0.20      0.07       250
weighted avg       0.04      0.20      0.07       250



  0%|          | 0/8 [00:00<?, ?it/s]

              precision    recall  f1-score   support

           0       0.22      0.90      0.36        50
           1       0.00      0.00      0.00        50
           2       0.38      0.10      0.16        50
           3       0.69      0.50      0.58        50
           4       0.00      0.00      0.00        50

    accuracy                           0.30       250
   macro avg       0.26      0.30      0.22       250
weighted avg       0.26      0.30      0.22       250



## Phase Wise (Real -> Synthetic)