# **I. Init & Setup**

In [None]:
import json

INPUT_FOLDER = "." # TODO: Fill the input path
test_file = f"{INPUT_FOLDER}/test.json" # TODO: Change the test file if needed

test_data = []

with open(test_file, 'r') as file:
    for line in file:
        try:
            json_obj = json.loads(line)
            test_data.append(json_obj)
        except json.JSONDecodeError:
            print(f"Skipping invalid JSON: {line.strip()}")

## **1. Pre-processing test data**

### **Delete unrelated data**

In [None]:
modified_data = []

for json_obj in test_data:
    # For task 1
    if 'article_url' in json_obj:
        del json_obj['article_url']
    if 'entity_list' in json_obj:
        del json_obj['entity_list']
    if 'caption1_modified' in json_obj:
        del json_obj['caption1_modified']
    if 'caption1_entities' in json_obj:
        del json_obj['caption1_entities']
    if 'caption2_modified' in json_obj:
        del json_obj['caption2_modified']
    if 'caption2_entities' in json_obj:
        del json_obj['caption2_entities']
    if 'maskrcnn_bboxes' in json_obj:
        del json_obj['maskrcnn_bboxes']
    if 'bert_base_score' in json_obj:
        del json_obj['bert_base_score']
    if 'bert_large_score' in json_obj:
        del json_obj['bert_large_score']

    # For task 2
    if 'caption_modified' in json_obj:
        del json_obj['caption_modified']
    if 'caption_entities' in json_obj:
        del json_obj['caption_entities']

    modified_data.append(json_obj)

### **Normalize data**

In [None]:
import re

for data_dict in modified_data:
    # For task 1
    if 'caption1' in data_dict:
        data_dict['caption1'] = re.sub(r'[^a-zA-Z0-9.\s]', '', data_dict['caption1'])
    if 'caption2' in data_dict:
        data_dict['caption2'] = re.sub(r'[^a-zA-Z0-9.\s]', '', data_dict['caption2'])

    # For task 2
    if 'caption' in data_dict:
        data_dict['caption'] = re.sub(r'[^a-zA-Z0-9.\s]', '', data_dict['caption'])

## **2. Setup Function**

In [None]:
pair = modified_data

### **Stable Diffusion**

In [None]:
!pip install transformers torch diffusers accelerate

In [None]:
import torch

device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
from diffusers import StableDiffusionImg2ImgPipeline

sd_pipe = StableDiffusionImg2ImgPipeline.from_pretrained(
    "nitrosocke/Ghibli-Diffusion",
    torch_dtype=torch.float16,
    use_safetensors=True,
    safety_checker = None,
    requires_safety_checker = False).to(device)

### **Compute Deviation**

In [None]:
from transformers import CLIPModel, CLIPProcessor

clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch16").to(device)
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch16")

In [None]:
def compute_deviation(image1, image2):
    image_tensor1 = clip_processor(images=image1, return_tensors="pt").to(device)
    image_tensor2 = clip_processor(images=image2, return_tensors="pt").to(device)

    image_feature1 = clip_model.get_image_features(image_tensor1.pixel_values)
    image_feature2 = clip_model.get_image_features(image_tensor2.pixel_values)

    similarity = torch.nn.functional.cosine_similarity(image_feature1, image_feature2, dim=-1)
    deviation_value = 1 - similarity.item()

    return deviation_value

### **NLI**

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

model_name = "MoritzLaurer/mDeBERTa-v3-base-xnli-multilingual-nli-2mil7"
tokenizer = AutoTokenizer.from_pretrained(model_name)
nli_model = AutoModelForSequenceClassification.from_pretrained(model_name)
nli_model.to(device)

def nli(string1, string2):
    inputs = tokenizer(string1, string2, truncation=True, return_tensors="pt")
    inputs.to(device)

    outputs = nli_model(**inputs)
    prediction = torch.softmax(outputs.logits[0], -1).tolist()

    label_names = ["entailment", "neutral", "contradiction"]
    prediction = {name: round(float(pred) * 100, 1) for pred, name in zip(prediction, label_names)}

    return prediction

### **Load pre-trained Context-Matching model**

In [None]:
import joblib
import numpy as np

context_matching = joblib.load('models/context_matching_model.pkl')

def deviation_predict(X):
    return context_matching.predict(X)

### **Save File**

In [None]:
def save_file(data_list, file_path):

    with open(file_path, 'w') as file:
        for item in data_list:
            file.write(str(item) + '\n')

    print(f'The list has been saved to {file_path}')

## **3. Main Function**

#### **Task 1 Function**

In [None]:
from torchvision import transforms
from PIL import Image
from IPython.display import display

def task1(item):
    generator = torch.Generator(device=device).manual_seed(1024)
    img_path = item['img_local_path']
    original_image = Image.open(img_path)
    original_image = original_image.resize((512, 512))

    caption1 = item['caption1']
    caption2 = item['caption2']

    generated_image_1 = sd_pipe(prompt=caption1, image=original_image, strength=0.75, guidance_scale=7.5, generator=generator).images[0]
    deviation_value_1 = compute_deviation(original_image, generated_image_1)
    deviation_value_1 = np.array(deviation_value_1).reshape(-1, 1)
    label_1 = deviation_predict(deviation_value_1)

    generated_image_2 = sd_pipe(prompt=caption2, image=original_image, strength=0.75, guidance_scale=7.5, generator=generator).images[0]
    deviation_value_2 = compute_deviation(original_image, generated_image_2)
    deviation_value_2 = np.array(deviation_value_2).reshape(-1, 1)
    label_2 = deviation_predict(deviation_value_2)

    # Condition
    if (label_1 + label_2 == 0):
        label = 0
        return label
    else:
        label = 1
        return label


#### **Task 2 Function**

In [None]:
from torchvision import transforms
from PIL import Image
from IPython.display import display

def task2(item):
    generator = torch.Generator(device=device).manual_seed(1024)
    img_path = item['img_local_path']
    original_image = Image.open(img_path)
    original_image = original_image.resize((512, 512))

    caption = item['caption']

    generated_image = sd_pipe(prompt=caption, image=original_image, strength=0.75, guidance_scale=7.5, generator=generator).images[0]
    deviation_value = compute_deviation(original_image, generated_image)
    deviation_value = np.array(deviation_value).reshape(-1, 1)
    label = deviation_predict(deviation_value)[0]

    return label

#### **Test Function**

In [None]:
from sklearn.metrics import accuracy_score, average_precision_score, f1_score
import time

def test_task1(pair):
    prediction_list = []
    true_labels = []

    start_time = time.time()
    for i, item in enumerate(pair):
        caption1 = item['caption1']
        caption2 = item['caption2']
        context_label = item['context_label']

        nli_result = nli(caption1, caption2)

        if nli_result['entailment'] >= 80:
            prediction = 0
        else:
            prediction = task1(item)

        prediction_list.append(prediction)
        true_labels.append(context_label)

    end_time = time.time()
    execution_time = end_time - start_time

    accuracy = accuracy_score(true_labels, prediction_list)
    average_precision = average_precision_score(true_labels, prediction_list)
    f1 = f1_score(true_labels, prediction_list)

    print("Execution time:", execution_time)
    print("Accuracy:", accuracy)
    print("Average Precision:", average_precision)
    print("F1-Score:", f1)

    return prediction_list

In [None]:
from sklearn.metrics import accuracy_score, average_precision_score, f1_score
import time

def test_task2(pair):
    prediction_list = []
    true_labels = []

    start_time = time.time()
    for i, item in enumerate(pair):
        caption = item['caption']
        context_label = item['context_label']

        prediction = task2(item)

        prediction_list.append(prediction)
        true_labels.append(context_label)

    end_time = time.time()
    execution_time = end_time - start_time

    accuracy = accuracy_score(true_labels, prediction_list)
    average_precision = average_precision_score(true_labels, prediction_list)
    f1 = f1_score(true_labels, prediction_list)

    print("Execution time:", execution_time)
    print("Accuracy:", accuracy)
    print("Average Precision:", average_precision)
    print("F1-Score:", f1)

    return prediction_list

# **II. Execute**

*TODO: Execute here for Task 1*

In [None]:
prediction_list = test_task1(pair)

In [None]:
output_file = "task1.txt"

save_file(prediction_list, output_file)

*TODO: Execute here for Task 2*

In [None]:
prediction_list = test_task2(pair)

In [None]:
output_file = "task2.txt"

save_file(prediction_list, output_file)