Kernel: huggingface1

### Imports

In [None]:
import torch
from transformers import AutoProcessor, AutoModelForCausalLM  
from PIL import Image
import requests
import copy

%matplotlib inline

In [None]:
import io
import sys
import os
import re
import json
import torch
import html
import base64
import itertools

import numpy as np
# import supervision as sv

from IPython.core.display import display, HTML #DeprecationWarning    
from torch.utils.data import Dataset, DataLoader
from transformers import (
    AdamW,
    AutoModelForCausalLM,
    AutoProcessor,
    get_scheduler
)
from tqdm import tqdm
from typing import List, Dict, Any, Tuple, Generator
from peft import LoraConfig, get_peft_model
from PIL import Image

In [None]:
# To import dataset, add relevant paths to system path

dataset_path = '../dataset'
  
# Select child directory
child_dir = os.path.abspath(dataset_path)  
# print(f'child_dir:{child_dir}')
  
# Add the child directory to sys.path  
if child_dir not in sys.path:  
    sys.path.append(child_dir)
    print(f'child_dir added to sys.path')
else:
    print(f'child_dir already in sys.path')

In [None]:
from sklearn.model_selection import train_test_split  
from dataset import Region2DescDataset

### Config

In [None]:
images_base_path = '/mnt/batch/tasks/shared/LS_root/mounts/clusters/computeinstance10-gpu/code/datasets/face_mask/images'
annotations_coco_path = '../annotations/face_bbox_annotations_sample.json' # File downloaded from AzureML as COCO file

annotations_jsonl_base_path = '../annotations' # Train and test files will be created at this path

In [None]:
# model_id = 'microsoft/Florence-2-base'

model_id = 'microsoft/Florence-2-large' # <-- Testing

# model_id = 'microsoft/Florence-2-large-ft'

# model_id = 'microsoft/Florence-2-base-ft' # <-- Tested
revision = None #'refs/pr/6'

print(f'model_id:{model_id}, revision:{revision}')

In [None]:
if model_id == 'microsoft/Florence-2-base-ft':
    test_size = 0.1
    BATCH_SIZE = 4
    NUM_WORKERS = 0
    EPOCHS = 201 #200 #400 #200 #100
    LR = 2e-6 #1e-6 #5e-6
elif model_id == 'microsoft/Florence-2-large':
    test_size = 0.1
    BATCH_SIZE = 2 # Samller batch size for bigger model
    NUM_WORKERS = 0
    EPOCHS = 50 #200 #400 #200 #100
    LR = 4e-6 #1e-6 #5e-6

In [None]:
device = 'cpu'

if torch.cuda.is_available():
    device='cuda'

print(f'device:{device}')

In [None]:
# Physical files that will be created (train and test)
annotations_jsonl_path_train = os.path.join(annotations_jsonl_base_path, 'face_bbox_annotations_sample' + '_train' + '.jsonl')
annotations_jsonl_path_test = os.path.join(annotations_jsonl_base_path, 'face_bbox_annotations_sample' + '_test' + '.jsonl')

### Common

In [None]:
def load_json(json_file_path):

    json_data = None

    with open(json_file_path, 'r') as file:
        json_data = json.load(file)

    return json_data 

In [None]:
def lookup_category(category_id, categories_list):
    
    category_name = None
    
    for item in categories_list:
        
        if item["id"] == category_id:
            category_name = item["name"]
            break
    
    return category_name

In [None]:
def format_bbox(single_bbox):
    
    formatted_coordinates = ['<loc_{}>'.format(coord) for coord in single_bbox]      
    
    # Join the formatted strings into a single string without any spaces  
    formatted_coordinates_str = ''.join(formatted_coordinates)  
    
    return formatted_coordinates_str

In [None]:
# Task specific conversion
def coco_to_jsonl_reg2desc(annotations_coco_path, annotations_jsonl_path_train, annotations_jsonl_path_test, test_size, random_state=42):
    
    # To store final output jsonl. each jsonl is a dict.
    line_dict_list = []
    
    # Read json
    annotations_json = load_json(annotations_coco_path)
    # print(f'annotations_json:{json.dumps(annotations_json, indent=4)}')
    
    # For each image
    for image in annotations_json["images"]:
        bbox_list = []
        line_dict = {}
        suffix_str = ''
        prefix_str = '<REGION_TO_DESCRIPTION>'
        
        # print(f'image["id"]:{image["id"]}')
        # print(f'image["file_name"]:{image["file_name"]}')         
        
        # All annotations in same file
        for annotation in annotations_json["annotations"]:
            # print(f'annotation:{annotation}')

            # All annotations for a particular image in same file
            if image["id"] == annotation["image_id"]: 
                
                ###########
                
                # Normalized COCO coordinates  
                x_min_norm, y_min_norm, width_norm, height_norm = annotation["bbox"] 
                
                # Convert normalized coordinates to absolute coordinates on a scale of 1000  
                x_min_abs = x_min_norm * 1000  
                y_min_abs = y_min_norm * 1000  
                width_abs = width_norm * 1000  
                height_abs = height_norm * 1000  
                
                # Calculate x_max and y_max  
                x_max_abs = x_min_abs + width_abs  
                y_max_abs = y_min_abs + height_abs 

                # # From COCO normalized to x1,y1,x2,y2                
                # img_width = image["width"]
                # img_height = image["height"]

                # Absolute coordinates on a scale of 1000 (x1, y1, x2, y2)  
                x1 = int(x_min_abs)
                y1 = int(y_min_abs)
                x2 = int(x_max_abs)
                y2 = int(y_max_abs)
                
                bbox_x1y1x2y2 = [x1,y1,x2,y2]
                
                ###########
                
                # This is the expected format for '<REGION_TO_DESCRIPTION>' task
                # bbox_converted = [round(value * 1000) for value in annotation["bbox"]]
                # bbox_converted = [round(value * 1000) for value in bbox_x1y1x2y2]
                bbox_converted = bbox_x1y1x2y2
                # print(f'bbox_converted:{bbox_converted}')
                
                bbox_formatted = format_bbox(bbox_converted)
                # print(f'bbox_formatted:{bbox_formatted}')
                bbox_list.append(bbox_formatted)                 
                
                annotation_category = lookup_category(annotation["category_id"], annotations_json["categories"])
                # print(f'annotation_category:{annotation_category}')
                
                # In labeling project: Re-label with 'mask' and 'no-mask' - DONE
                # Then create suffix_str based on label name. e.g. 9 of clubs<><><><>10 of clubs<><><><>
                
                # Create suffix string e.g. "9 of clubs<loc_138><loc_100><loc_470><loc_448>10 of clubs<loc_388><loc_145><loc_670><loc_453>"
                if annotation_category == 'No_Mask':
                    suffix_str += 'Not wearing a mask' + bbox_formatted
                elif annotation_category == 'Mask':
                    suffix_str += 'Wearing a mask' + bbox_formatted
                
                
        # print(f'suffix_str:{suffix_str}')
        # print(f'bbox_list:{bbox_list}') 
        
        # Update dictionary with image, prefix, suffix
        line_dict["image"] = image["file_name"].split('/')[1] # Keepm only image name with extension
        line_dict["prefix"] = prefix_str
        line_dict["suffix"] = suffix_str        
        
        # Add line_dict to list
        line_dict_list.append(line_dict)              
                       
        
    # print(f'line_dict_list:{line_dict_list}')
    
    # Split the list into train and test sets
    line_dict_list_train, line_dict_list_test = train_test_split(line_dict_list, test_size=test_size, random_state=random_state)  
    
    # print(f'line_dict_list_train:{line_dict_list_train}')
    # print()
    # print(f'line_dict_list_test:{line_dict_list_test}')   
    
    # Delete the jsonl output file if it exists
    if os.path.exists(annotations_jsonl_path_train):  
        os.remove(annotations_jsonl_path_train)      
    if os.path.exists(annotations_jsonl_path_test):  
        os.remove(annotations_jsonl_path_test)      
    
    # Save to line_dict_list into a jsonl file (train)
    with open(annotations_jsonl_path_train, 'w') as file1:  
        for dictionary1 in line_dict_list_train:  
            # Convert the dictionary to a JSON string  
            json_str = json.dumps(dictionary1)  
            # Write the JSON string to the file followed by a newline  
            file1.write(json_str + '\n')  
            
    # Save to line_dict_list into a jsonl file (test)
    with open(annotations_jsonl_path_test, 'w') as file2:  
        for dictionary2 in line_dict_list_test:  
            # Convert the dictionary to a JSON string  
            json_str = json.dumps(dictionary2)  
            # Write the JSON string to the file followed by a newline  
            file2.write(json_str + '\n') 
            
    print(f'Files created: \n{annotations_jsonl_path_train}, \n{annotations_jsonl_path_test}')

In [None]:
def run_example(model, task_prompt, text_input=None):   
    
    if text_input is None:
        prompt = task_prompt
    else:
        prompt = task_prompt + text_input
    
    inputs = processor(text=prompt, images=image, return_tensors="pt")
    
    # MK
    # Move the Input Data to GPU
    if device == 'cuda':
        inputs = {k: v.to(device) for k, v in inputs.items()}  
         
    generated_ids = model.generate(
      input_ids=inputs["input_ids"],
      pixel_values=inputs["pixel_values"],
      max_new_tokens=1024,
      early_stopping=False,
      do_sample=False,
      num_beams=3,
    )
    
    generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
    
    parsed_answer = processor.post_process_generation(
        generated_text, 
        task=task_prompt, 
        image_size=(image.width, image.height)
    )

    return parsed_answer

In [None]:
# MK
from PIL import Image  
import matplotlib.pyplot as plt  
import matplotlib.patches as patches  
import re  
  
def plot_normalized_bbox(image, bbox_data):  
    # Create a figure and axes  
    fig, ax = plt.subplots()  
      
    # Display the image  
    ax.imshow(image)  
      
    # Get image dimensions  
    img_width, img_height = image.size  
      
    # Parse the normalized bounding box coordinates  
    bboxes = re.findall(r"<loc_(\d+)><loc_(\d+)><loc_(\d+)><loc_(\d+)>", bbox_data)  
      
    # Convert normalized coordinates to absolute coordinates and plot the rectangles  
    for bbox in bboxes:  
        # Normalize coordinates and convert to float  
        x1, y1, x2, y2 = [float(coord)/1000 for coord in bbox]  
          
        # Convert to absolute coordinates  
        abs_x1, abs_y1 = x1 * img_width, y1 * img_height  
        abs_x2, abs_y2 = x2 * img_width, y2 * img_height  
          
        # Create a Rectangle patch  
        rect = patches.Rectangle((abs_x1, abs_y1), abs_x2 - abs_x1, abs_y2 - abs_y1, linewidth=1, edgecolor='r', facecolor='none')  
          
        # Add the rectangle to the Axes  
        ax.add_patch(rect)  
      
    # Remove the axis ticks and labels  
    ax.axis('off')  
      
    # Show the plot  
    plt.show

In [None]:
def plot_loss(avg_train_loss_list, avg_val_loss_list, size=(10, 5), title='Training vs Validation Loss',   
              x_label='Epochs', y_label='Loss', train_legend='Training Loss', val_legend='Validation Loss'):  
    
    # Set the size of the plot  
    plt.figure(figsize=size)  
      
    # Plot training and validation loss  
    plt.plot(avg_train_loss_list, label=train_legend)  
    plt.plot(avg_val_loss_list, label=val_legend)  
      
    # Adding title and labels  
    plt.title(title)  
    plt.xlabel(x_label)  
    plt.ylabel(y_label)  
      
    # Show legend  
    plt.legend()  
      
    # Show the plot  
    plt.show()

### Temp. Checks

In [None]:
# # Check json loading
# annotations_json =load_json(annotations_coco_path)
# print(f'annotations_json:{json.dumps(annotations_json, indent=4)}')

#### Convert to JSONL
Annotations from COCO to JSONL

In [None]:
# Convert from coco to jsonl
coco_to_jsonl_reg2desc(annotations_coco_path, annotations_jsonl_path_train, annotations_jsonl_path_test, test_size = test_size, random_state = 42)

### Model

In [None]:
model = None
processor = None

if revision:
    # model = AutoModelForCausalLM.from_pretrained(model_id, trust_remote_code=True).eval().to(device)
    model = AutoModelForCausalLM.from_pretrained(model_id, trust_remote_code=True, revision = revision).to(device)
    processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True, revision = revision)
else: # Ignore revision
    model = AutoModelForCausalLM.from_pretrained(model_id, trust_remote_code=True).to(device)
    processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True)  

print(f'Model loaded')

### Dataset

In [None]:
# Initiate Dataset and DataLoader for train and validation subsets

def collate_fn(batch):
    questions, answers, images = zip(*batch)
    # print(f'questions: {questions}, answers: {answers}, images:{images}')
    inputs = processor(text=list(questions), images=list(images), return_tensors="pt", padding=True).to(device)
    return inputs, answers

train_dataset = Region2DescDataset(
    jsonl_file_path = annotations_jsonl_path_train,
    image_directory_path = images_base_path
)

val_dataset = Region2DescDataset(
    jsonl_file_path = annotations_jsonl_path_test,
    image_directory_path = images_base_path
)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn, num_workers=NUM_WORKERS, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn, num_workers=NUM_WORKERS)

In [None]:
# LoRA Florence-2 model

config = LoraConfig(
    r=16, #8
    lora_alpha=8,
    target_modules=["q_proj", "o_proj", "k_proj", "v_proj", "linear", "Conv2d", "lm_head", "fc2"],
    task_type="CAUSAL_LM",
    lora_dropout=0.05,
    bias="none",
    inference_mode=False,
    use_rslora=True,
    init_lora_weights="gaussian",
    revision=revision
)

peft_model = get_peft_model(model, config)
peft_model.print_trainable_parameters()

In [None]:
torch.cuda.empty_cache()

### Fine-tune

In [None]:
# training loop
def train_model(train_loader, val_loader, model, processor, epochs=10, lr=1e-6):
    avg_train_loss_list = []
    avg_val_loss_list = []
    
    optimizer = AdamW(model.parameters(), lr=lr)
    num_training_steps = epochs * len(train_loader)
    lr_scheduler = get_scheduler(
        name="linear",
        optimizer=optimizer,
        num_warmup_steps=0,
        num_training_steps=num_training_steps,
    )

    # render_inference_results(peft_model, val_loader.dataset, 6)

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for inputs, answers in tqdm(train_loader, desc=f"Training Epoch {epoch + 1}/{epochs}"):

            input_ids = inputs["input_ids"]
            pixel_values = inputs["pixel_values"]
            labels = processor.tokenizer(
                text=answers,
                return_tensors="pt",
                padding=True,
                return_token_type_ids=False
            ).input_ids.to(device)

            outputs = model(input_ids=input_ids, pixel_values=pixel_values, labels=labels)
            loss = outputs.loss

            loss.backward(), optimizer.step(), lr_scheduler.step(), optimizer.zero_grad()
            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)
        print(f"Average Training Loss: {avg_train_loss}")
        avg_train_loss_list.append(avg_train_loss)

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for inputs, answers in tqdm(val_loader, desc=f"Validation Epoch {epoch + 1}/{epochs}"):

                input_ids = inputs["input_ids"]
                pixel_values = inputs["pixel_values"]
                labels = processor.tokenizer(
                    text=answers,
                    return_tensors="pt",
                    padding=True,
                    return_token_type_ids=False
                ).input_ids.to(device)

                outputs = model(input_ids=input_ids, pixel_values=pixel_values, labels=labels)
                loss = outputs.loss

                val_loss += loss.item()

            avg_val_loss = val_loss / len(val_loader)
            print(f"Average Validation Loss: {avg_val_loss}")
            avg_val_loss_list.append(avg_val_loss)

            # render_inference_results(peft_model, val_loader.dataset, 6)

    # MK
    # Save last epoch
    output_dir = f"../model_checkpoints/epoch_{epoch+1}"
    os.makedirs(output_dir, exist_ok=True)
    model.save_pretrained(output_dir)
    processor.save_pretrained(output_dir)
    
    return avg_train_loss_list, avg_val_loss_list

In [None]:
%%time
avg_train_loss_list, avg_val_loss_list = train_model(train_loader, val_loader, peft_model, processor, epochs=EPOCHS, lr=LR)

plot_loss(avg_train_loss_list, avg_val_loss_list, size=(6, 3), title='Training vs Validation Loss',   
              x_label='Epochs', y_label='Loss', train_legend='Training Loss', val_legend='Validation Loss')

### Load fine-tuned model

In [None]:
# checkpoint_path = '../model_checkpoints/' + model_id.replace('/','_').replace('-','_').replace(' ','_') + '_epoch_' + str(EPOCHS)
checkpoint_path = '../model_checkpoints/' + 'epoch_' + str(EPOCHS)

model_ft = AutoModelForCausalLM.from_pretrained(checkpoint_path, trust_remote_code=True, revision = revision).to(device)
processor = AutoProcessor.from_pretrained(checkpoint_path, trust_remote_code=True, revision = revision)

print(f'Loaded checkpoint_path:{checkpoint_path}')

In [None]:
use_own_image = True
# image_path = '../test_images/maksssksksss0.png'
image_path = os.path.join(images_base_path,'maksssksksss19.png')

print(f'use_own_image: {use_own_image}')

if not use_own_image:
    url = "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/transformers/tasks/car.jpg?download=true"
    image = Image.open(requests.get(url, stream=True).raw)
else:    
    image = Image.open(image_path).convert('RGB')
    
# See input image
image

In [None]:
# This sample taken from jsonl file
'''{"image": "maksssksksss7.png", "prefix": "<REGION_TO_DESCRIPTION>", "suffix": "
Wearing a mask<loc_614><loc_232><loc_882><loc_706>
Wearing a mask<loc_362><loc_197><loc_573><loc_588>
Wearing a mask<loc_221><loc_382><loc_432><loc_645>
Not wearing a mask<loc_275><loc_161><loc_350><loc_307>"}
'''

'''
{"image": "maksssksksss2.png", "prefix": "<REGION_TO_DESCRIPTION>", "suffix": "
Wearing a mask<loc_591><loc_52><loc_707><loc_238>
Wearing a mask<loc_824><loc_15><loc_954><loc_234>
Wearing a mask<loc_366><loc_82><loc_473><loc_272>
Wearing a mask<loc_161><loc_49><loc_275><loc_247>"}
'''

'''{"image": "maksssksksss19.png", "prefix": "<REGION_TO_DESCRIPTION>", "suffix": "
Wearing a mask<loc_215><loc_90><loc_296><loc_263>
Wearing a mask<loc_745><loc_56><loc_979><loc_582>
Wearing a mask<loc_238><loc_180><loc_499><loc_739>
Not wearing a mask<loc_662><loc_17><loc_716><loc_136>
Not wearing a mask<loc_0><loc_141><loc_53><loc_287>"}'''

# MK
# Added to adjust the region coordinates as per your preference in the given image
# custom_region = "<loc_320><loc_200><loc_450><loc_400>"
custom_region = "<loc_0><loc_141><loc_53><loc_287>"
plot_normalized_bbox(image, custom_region)

In [None]:
%%time

task_prompt = '<REGION_TO_DESCRIPTION>'
print(f'use_own_image:{use_own_image}')

if not use_own_image:    
    results = run_example(model_ft,task_prompt, text_input="<loc_52><loc_332><loc_932><loc_774>")
else:
    results = run_example(model_ft,task_prompt, text_input=custom_region)
print(results)