# Literacy in Visualization

## Related Functions

In [None]:
import json
import re
import requests
import base64
import os
from PIL import Image, ImageDraw

def run_local_vision_request(text, image_urls, temperature=0):
    # Function to encode the image
    def encode_image(image_path):
        with open(image_path, "rb") as image_file:
            return base64.b64encode(image_file.read()).decode('utf-8')
    api_key = "OPENAI_KEY"

    messages = [
        {
            "role": "user",
            "content": [{"type": "text", "text": text},
            ]
        }
    ]

    for image_path in image_urls:
        # Getting the base64 string
        base64_image = encode_image(image_path)
        messages[0]["content"].append({
            "image_url": f"data:image/jpeg;base64,{base64_image}"
        })

    headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {api_key}"
    }

    payload = {
        "model": "gpt-4-vision-preview",
        "messages": messages,
        "max_tokens": 2048,
        "temperature": temperature,
    }

    response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
    return response.json()

def run_test(name, description, prompt, image_urls, logger, times=1):
    response_list = []
    while len(response_list) < times:
        try:
            response = run_local_vision_request(
                text=prompt, 
                image_urls=image_urls,
            )
            matched_content = re.search(r'```json([\s\S]*?)```', response["choices"][0]["message"]["content"])
            answer = json.loads(matched_content.group(1))
            logger.log(name, description, prompt, image_urls, response["choices"][0]["message"]["content"], answer)
            response_list.append(response)
        except Exception as e:
            print(e)
    return response_list

class Logger:
    def __init__(self, file_path):
        self.file_path = file_path
        
    def log(self, name, description, prompt, image_urls, response, answer):
        json_data_to_add = {"name": name, "description": description, "prompt": prompt, "image_urls": image_urls, "response": response, "answer": answer}

        with open(self.file_path, 'r') as file:
            data = json.load(file)

        data.append(json_data_to_add)

        with open(self.file_path, 'w') as file:
            json.dump(data, file, indent=4)
            
logger = Logger("./calvi.json")


def resize_image(image_path, output_path):
    with open(image_path, 'rb') as file:
        signature = file.read(8)
    if signature == b'\x89PNG\r\n\x1a\n':
        original_image = Image.open(image_path)
        width, height = original_image.size
        new_width = width // 2
        new_height = height // 2
        
        reduced_image = original_image.resize((new_width, new_height), Image.Resampling.LANCZOS)

        resized_image = reduced_image.resize((width, height), Image.BILINEAR)
        
        resized_image.save(output_path)

def mask_image():
    directory_path = "./calvi/question/"
    target_path = "./calvi/visualization/"
    file_names = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]
    with open("./calvi/items_data_revised.json", "r") as f:
        answers = json.load(f)
        
    for file in file_names[0:12] + file_names[13:]:
        if int(re.search(r"-(\d+)", file).group(1)) < 48:
            answer = answers[re.search(r"-(\d+)", file).group(1)]
            if len(answer["Option"]) == 4:
                cover_height_down = 540
            else:
                cover_height_down = 340
            img_path = directory_path + file
            img = Image.open(img_path)

            width, height = img.size

            cover_height_up = 100
            cover_height_down = 520

            draw = ImageDraw.Draw(img)

            draw.rectangle([0, 0, width, cover_height_up], fill="white")  # 上方
            draw.rectangle([0, height-cover_height_down, width, height], fill="white")  # 下方

            img.save(target_path + file)

## Experiment 1

### Prompt Preparation

In [None]:
ROLE = "You are an average user. "
TASK = "Please answer the question in the image and give your reason for the answer."
RATING_JSON_FORMAT = '''Please give an additional result in json format at the end of your answer, like 
```json{"answer": answer, "description": description}```.
'''

### Data Preparation

In [None]:
directory_path = "./calvi/visualization/"
with open("./calvi/items_data_revised.json", "r") as f:
    answers = json.load(f)

### Task Execution

In [None]:
name = "experiment1"
file_names = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]

for file in file_names:
    description = "answer_question_" + file
    image_urls = [directory_path + file]
    index = re.search(r"-(\d+)", file).group(1)
    if int(index) < 48:
        answer = answers[index]
        QUESTION_OPTIONS = answer["Question"] + " Your answer should be one of " + str(answer["Option"]) + ". "
        prompt = ROLE + \
            QUESTION_OPTIONS + \
            TASK + \
            RATING_JSON_FORMAT + \
            "DON'T say sorry or you cannot. YOU CAN."
        run_test(name, description, prompt, image_urls, logger, times=5)

## Experiment2

### Prompt Preparation

In [None]:
ROLE = "You are a visualization expert. "
TASK = "Please rate the item on a scale of 1 (not relevant) to 4 (highly relevant) based on the image and give your reason. The relevance meaning that differences in ability (i.e., the ability to read, interpret, and reason about erroneous or potentially misleading visualizations) should lead to differences in measurement outcomes (i.e., correctness). High relevance indicates that this vis and task contain high-related misleading elements. Also give your answer to the question."
RATING_JSON_FORMAT = '''Please give an additional result in json format at the end of your answe, like 
```json{"score": score, "reason": reason, "answer": answer}```.
'''

### Data Preparation

In [None]:
directory_path = "./calvi/visualization/"
with open("./calvi/items_data_revised.json", "r") as f:
    answers = json.load(f)

In [None]:
name = "experiment2"

file_names = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]

for file in file_names:
    description = "expert_" + file
    image_urls = [directory_path + file]
    index = re.search(r"-(\d+)", file).group(1)
    if int(index) < 48 or int(index) > 71:
        answer = answers[index]
        QUESTION_OPTIONS = answer["Question"] + " Your answer should be one of " + str(answer["Option"]) + ". "
        prompt = ROLE + \
            QUESTION_OPTIONS + \
            TASK + \
            RATING_JSON_FORMAT + \
            "DON'T say sorry or you cannot. YOU CAN."
        run_test(name, description, prompt, image_urls, logger, times=5)

### correctness judgment

In [None]:
def get_result_by_description(data, name, description):
    results = []
    for result in data:
        if name == result["name"] and description == result["description"]:
            results.append(result)
    return results

def judge_correctness():
    with open("./calvi/items_data_revised.json", "r") as f:
        answers = json.load(f)
    with open("./calvi.json", "r") as f:
        results = json.load(f)

    directory_path = "./calvi/visualization/"

    # 获取目录下的所有文件名
    file_names = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]
    for file in file_names:
        name, ext = os.path.splitext(file)
        if ext == '.png' and (int(re.search(r"-(\d+)", file).group(1)) < 48 or int(re.search(r"-(\d+)", file).group(1)) > 64):
            result_list = get_result_by_description(data=results, name="experiment2", description="expert_visualization_" + file)
            
            for result in result_list:
                answer = answers[re.search(r"-(\d+)", file).group(1)]
                if type(answer["Best Answer"]) == list:
                    for _answer in answer["Best Answer"]:
                        if str(result["answer"]["answer"]) == str(_answer):
                            result["Best Answer"] = True
                            break
                        else:
                            result["Best Answer"] = False
                else:
                    if str(result["answer"]["answer"]) == str(answer["Best Answer"]):
                        result["Best Answer"] = True
                    else:
                        result["Best Answer"] = False
                if "Wrong-Due-To-Misleader Answer" in answer:
                    if  type(answer["Wrong-Due-To-Misleader Answer"]) == list:
                        for _answer in answer["Wrong-Due-To-Misleader Answer"]:
                            if str(result["answer"]["answer"]) == str(_answer):
                                result["Wrong-Due-To-Misleader Answer"] = True
                                break
                            else:
                                result["Wrong-Due-To-Misleader Answer"] = False
                    else:
                        if str(result["answer"]["answer"]) == str(answer["Wrong-Due-To-Misleader Answer"]):
                            result["Wrong-Due-To-Misleader Answer"] = True
                        else:
                            result["Wrong-Due-To-Misleader Answer"] = False
                

    with open("./calvi.json", 'w') as file:
        json.dump(results, file, indent=4)

## Additional Experiment

### Visualization with Text

In [None]:
name = "extra_experiment"
# image with question and options
directory_path = "./calvi/question/"
file_names = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]

for file in file_names:
    description = "visualization_with_text_" + file
    image_urls = [directory_path + file]
    index = re.search(r"-(\d+)", file).group(1)
    if int(index) < 48:
        prompt = ROLE + \
            TASK + \
            RATING_JSON_FORMAT + \
            "DON'T say sorry or you cannot. YOU CAN."
        run_test(name, description, prompt, image_urls, logger, times=5)

### Vague Visualization

In [None]:
name = "extra_experiment"
# vague image
directory_path = "./calvi/visualization_vague/"
file_names = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]

for file in file_names:
    description = "vague_" + file
    image_urls = [directory_path + file]
    index = re.search(r"-(\d+)", file).group(1)
    if int(index) < 48:
        answer = answers[index]
        QUESTION_OPTIONS = answer["Question"] + " Your answer should be one of " + str(answer["Option"]) + ". "
        prompt = ROLE + \
            QUESTION_OPTIONS + \
            TASK + \
            RATING_JSON_FORMAT + \
            "DON'T say sorry or you cannot. YOU CAN."
        run_test(name, description, prompt, image_urls, logger, times=5)