In [None]:
import os
os.chdir('../')

In [35]:
import re
import json
import time
import shutil
import requests
from datetime import datetime

import numpy as np
import pandas as pd

import torch
from torch import dtype

import transformers
from PIL import Image
from tqdm import tqdm
from bs4 import BeautifulSoup
import matplotlib.pyplot as plt

from transformers import AutoProcessor, AutoModelForCausalLM

from src.general_utils import util

In [None]:
# !pip install flash_attn einops timm

In [None]:
from unittest.mock import patch
from transformers.dynamic_module_utils import get_imports

def fixed_get_imports(filename: str | os.PathLike) -> list[str]:
    if not str(filename).endswith("modeling_florence2.py"):
        return get_imports(filename)
    imports = get_imports(filename)
    imports.remove("flash_attn")
    return imports


In [None]:
model_name = 'microsoft/Florence-2-large'
path_cache_dir = '/Users/manish.sahu/Downloads/tiler/scrap-bookmyshow/experiment/pretrained_model/florence-2'

with patch("transformers.dynamic_module_utils.get_imports", fixed_get_imports): #workaround for unnecessary flash_attn requirement
    model = AutoModelForCausalLM.from_pretrained(model_name, attn_implementation="sdpa", trust_remote_code=True, cache_dir=path_cache_dir)

# model = AutoModelForCausalLM.from_pretrained(model_name, trust_remote_code=True, cache_dir=path_cache_dir)
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)

In [None]:
def florence2(image, task_prompt, text_input=None, device='cpu'):
    """
    Calling the Microsoft Florence2 model
    """
    if text_input is None:
        prompt = task_prompt
    else:
        prompt = task_prompt + text_input

    inputs = processor(text=prompt, images=image, return_tensors="pt")
    
    if device == 'cuda':
        # print(inputs["input_ids"].shape, inputs["pixel_values"].shape, inputs["attention_mask"].shape, inputs)
        generated_ids = model.generate(
            input_ids=inputs["input_ids"].cuda(),
            pixel_values=inputs["pixel_values"].cuda(),
            max_new_tokens=1024,
            early_stopping=False,
            do_sample=False,
            num_beams=3,
        )
    else:
        generated_ids = model.generate(
            input_ids=inputs["input_ids"],
            pixel_values=inputs["pixel_values"],
            max_new_tokens=1024,
            early_stopping=False,
            do_sample=False,
            num_beams=3,
        )

    generated_text = processor.batch_decode(generated_ids,
                                            skip_special_tokens=False)[0]
    parsed_answer = processor.post_process_generation(
        generated_text,
        task=task_prompt,
        image_size=(image.width, image.height))

    return parsed_answer

In [None]:
caption_tags = [
    '<MORE_DETAILED_CAPTION>',
]

In [None]:
path_image_dir = '/Users/manish.sahu/Downloads/tiler/scrap-bookmyshow/data/wickedweasel'
path_image_list = util.list_list(path_image_dir, ('jpg', 'jpeg', 'png'))
n = len(path_image_list)
print(f'Numnber of images: {n}')

In [None]:
data_caption_list = []
task_prompt = '<MORE_DETAILED_CAPTION>'

for path_image in tqdm(path_image_list):
    basename = util.get_file_name(path_image)

    image = Image.open(path_image).convert('RGB')
    width, height = image.size

    desired_height = 1024
    desired_width = int(width * desired_height / height)
    image_small = image.resize((desired_width, desired_height))
    
    parsed_answer = florence2(image, task_prompt, text_input=None, device='cpu')
    temp_caption_dict = {
        'title': basename,
        "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "path_image": path_image,
        'more_detail_caption': parsed_answer[task_prompt].replace('\n', ''),
    }

    data_caption_list.append(temp_caption_dict)


In [None]:
path_csv = f'/workspace/jupyter_notebooks/manish_sahu/clipeus/training/mystique/mystique-visuals/data/assets/captions/{type_name}/caption.csv'
data_csv = pd.DataFrame(data_caption_list)
data_csv.to_csv(path_csv, index=False)


## Sample

In [36]:
path_image_dir = '/Users/manish.sahu/Downloads/tiler/scrap-bookmyshow/data/wickedweasel/images'
path_image_list = util.list_list(path_image_dir, ('jpg', 'jpeg', 'png'))
n = len(path_image_list)
print(f'Numnber of images: {n}')

Numnber of images: 9241


In [37]:
index = 0
data_df = []

for path_image in tqdm(path_image_list):
    clothes_type_1 = path_image.split('/')[-4]
    clothes_type_2 = path_image.split('/')[-3]
    # print(f'Clothes type 1: {clothes_type_1}')
    # print(f'Clothes type 2: {clothes_type_2}')

    data_df.append({
        'path_image': path_image,
        'clothes_class': clothes_type_1,
        'clothes_type': clothes_type_2,
    })

data_df = pd.DataFrame(data_df)
data_df.head(2)


100%|██████████| 9241/9241 [00:00<00:00, 620321.74it/s]


Unnamed: 0,path_image,clothes_class,clothes_type
0,/Users/manish.sahu/Downloads/tiler/scrap-bookm...,activewear,swimwear
1,/Users/manish.sahu/Downloads/tiler/scrap-bookm...,activewear,swimwear


In [38]:
sample = 15
clothes_class_list = data_df['clothes_class'].unique()
clothes_type_list = data_df['clothes_type'].unique()
sample_df = []

for clothes_class in clothes_class_list:
    for clothes_type in clothes_type_list:
        temp_df = data_df[(data_df['clothes_class'] == clothes_class) & (data_df['clothes_type'] == clothes_type)]
        n = len(temp_df)
        temp_sample = min(n, sample)

        sample_df.append(temp_df.sample(temp_sample))
        # print(f'Clothes class: {clothes_class}, Clothes type: {clothes_type}, Numnber of images: {n}')
    
sample_df = pd.concat(sample_df)

In [42]:
path_output_dir = '/Users/manish.sahu/Downloads/tiler/scrap-bookmyshow/data/wickedweasel/sample_images'
util.check_dir(path_output_dir)

path_image_list = sample_df['path_image'].tolist()
class_list = sample_df['clothes_class'].tolist()
type_list = sample_df['clothes_type'].tolist()
n = len(path_image_list)

for index in tqdm(range(n)):
    path_image = path_image_list[index]
    class_name = class_list[index]
    type_name = type_list[index]

    basename = util.getNamenoExt(path_image)
    
    path_output = os.path.join(path_output_dir, f'{class_name}-{type_name}-{basename}.jpg')
    shutil.copy(path_image, path_output)

100%|██████████| 390/390 [00:00<00:00, 1525.18it/s]
