In [None]:
import torch
import json
import clip
from PIL import Image
import os
from diffusers import StableDiffusionPipeline, EulerDiscreteScheduler
import torch
import numpy as np
from transformers import CLIPProcessor, CLIPModel
from torchmetrics.functional.multimodal import clip_score
from functools import partial
from pipeline_rf import RectifiedFlowPipeline
import random
import hpsv2

### load model

In [None]:
clip.available_models()
clip_model, clip_preprocess = clip.load('ViT-L/14@336px')
clip_model = clip_model.to("cuda")

In [None]:
model_id = "/data/model/stable-diffusion-2-1"
scheduler = EulerDiscreteScheduler.from_pretrained(model_id, subfolder="scheduler")
sd_pipe = StableDiffusionPipeline.from_pretrained(model_id, scheduler=scheduler, torch_dtype=torch.float16)
sd_pipe = sd_pipe.to("cuda")
sd_pipe.set_progress_bar_config(disable=True)

In [None]:
instaflow_pipe = RectifiedFlowPipeline.from_pretrained("/data/model/instaflow_0_9B_from_sd_1_5", torch_dtype=torch.float16, safety_checker=None, requires_safety_checker=False) 
### switch to torch.float32 for higher quality

instaflow_pipe.to("cuda")  ### if GPU is not available, comment this line
instaflow_pipe.set_progress_bar_config(disable=True)

### clip score

In [None]:
def get_clip_score(image, text):
    # Load the pre-trained CLIP model and the image

    # Preprocess the image and tokenize the text
    image_input = clip_preprocess(image).unsqueeze(0)
    text_input = clip.tokenize([text])
    
    # Move the inputs to GPU if available
    device = "cuda" if torch.cuda.is_available() else "cpu"
    image_input = image_input.to(device)
    text_input = text_input.to(device)
    
    # Generate embeddings for the image and text
    with torch.no_grad():
        image_features = clip_model.encode_image(image_input)
        text_features = clip_model.encode_text(text_input)
    
    # Normalize the features
    image_features = image_features / image_features.norm(dim=-1, keepdim=True)
    text_features = text_features / text_features.norm(dim=-1, keepdim=True)
    
    # Calculate the cosine similarity to get the CLIP score
    clip_score = torch.matmul(image_features, text_features.T).item()
    
    return clip_score

In [None]:
prompt = "a photo of an astronaut riding a horse on mars"
image = pipe(prompt, num_inference_steps=25).images[0]  
print(get_clip_score(image,prompt)) # CLIP score: 0.347
image

In [None]:
image = instaflow_pipe(prompt=prompt, num_inference_steps=1, guidance_scale=0.0).images[0] 
print(get_clip_score(image,prompt)) # CLIP score: 0.347
image

In [None]:
# Path to the folder containing your images
folder_path = "/home/liutao/workspace/distill/swift_photo_with_text"

# Initialize empty lists to store images and their names
image_list = []
image_name_list = []

# Loop through each file in the folder
for filename in os.listdir(folder_path):
    # Check if the file is an image (you can customize the extension check)
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')):
        # Load the image
        image_path = os.path.join(folder_path, filename)
        image = Image.open(image_path)

        # Append the image and its name to the lists
        image_list.append(image)
        image_name_list.append(filename)

# Now, image_list contains PIL Image objects, and image_name_list contains corresponding names
avg_score = 0
for i in range(len(image_list)):
    image = image_list[i]
    text = image_name_list[i]
    score = get_clip_score(image, text)
    avg_score += score
    
print(f"AVG CLIP Score: {avg_score/len(image_list)}") # CLIP score:0.300


In [None]:
# Load the .npz file
# data = np.load('/data/20231212/SwiftBrush_reproduce_final20231227/val2014_captions.npz')
# captions = data['captions'][()]
# print(len(captions),captions[0],captions[1])
# data.close()

In [None]:
coco_f = open('/data/dataset/coco2014-val/annotations/captions_val2014.json')
coco_annotations = json.load(coco_f)
captions = []
for annotation in coco_annotations['annotations']:
    caption = annotation['caption']
    captions.append(caption)
coco_f.close()
print(len(captions),captions[0],captions[1])

In [None]:
captions_30k = random.choices(captions, k=30000)
print(len(captions_30k),captions_30k[0],captions_30k[1])

In [None]:
#smallset_test >>> instaflow:0.26 sd_1_step:0.138 sd_25_step:0.22
#instaflow coco30k clip_socre: 0.2580452107747396
count = 0
total_score = 0
for case_number, caption in enumerate(captions_30k):
    image = instaflow_pipe(prompt=caption, num_inference_steps=1, guidance_scale=0.0).images[0] 
    # image = sd_pipe(prompt=caption, num_inference_steps=25, guidance_scale=0.0).images[0]
    score = get_clip_score(image, caption)
    total_score += score
    count += 1
    if count % 1000 == 0:
        print("current num:",count,"current avg clip score:",total_score/count)
print(f"AVG CLIP Score: {total_score/count}")

### hpsv2 score

In [None]:
# Get benchmark prompts (<style> = all, anime, concept-art, paintings, photo)
all_prompts = hpsv2.benchmark_prompts('all') 

# Iterate over the benchmark prompts to generate images
for style, prompts in all_prompts.items():
    for idx, prompt in enumerate(prompts):
        image = instaflow_pipe(prompt=prompt, num_inference_steps=1, guidance_scale=0.0).images[0] 
        # TextToImageModel is the model you want to evaluate
        image.save(os.path.join("/home/liutao/workspace/data/instaflow_hpsv2", style, f"{idx:05d}.jpg")) 
        # <image_path> is the folder path to store generated images, as the input of hpsv2.evaluate().


: 

In [None]:
hpsv2.evaluate("/home/liutao/workspace/data/instaflow_hpsv2") 