In [None]:
from env import CruxEnv
from pydantic import BaseModel
from tqdm import tqdm
from py2cfg import CFGBuilder
import base64
import os
from openai import AzureOpenAI
import anthropic

# IternVL2
import numpy as np
import torch
import torchvision.transforms as T
from decord import VideoReader, cpu
from PIL import Image
from torchvision.transforms.functional import InterpolationMode
from transformers import AutoModel, AutoTokenizer

class config(BaseModel):
    limit: int = 250
    subset: str = "output"


In [None]:
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))  

def run(prompt):
    response = client.messages.create(
        model="claude-3-5-sonnet-20240620",
        system="You are a helpful assistant.",
        messages=[
            {"role": "user", "content": prompt}
        ],
        max_tokens=2048
    )

    return response.content[0].text



In [None]:
def code_to_image(code: str):
    #TODO: quick fix for now, since although the cfg image is saved, the CFGBuilder is not able to finished
    cfg = CFGBuilder().build_from_src('ControlFlowGraph', code)
    render = cfg.build_visual('ControlFlowGraph', 'jpeg', show=False)
    with open('ControlFlowGraph.jpeg', 'rb') as f:
        image_data = f.read()
    return base64.b64encode(image_data).decode('utf-8')

def visual_run(prompt, image_data):
    response = client.messages.create(
        model="claude-3-5-sonnet-20240620",
        system="You are a helpful assistant.",
        messages=[
        { "role": "user", "content": [  
            { 
                "type": "text", 
                "text": "You are given a control flow graph image of a code snippet, utilize them in code execution reasoning process. " + prompt, 
            },
            {
                "type": "image",
                "source": {
                    "type": "base64",
                    "media_type": "image/jpeg",
                    "data": image_data
                }
            },
        ] }
    ],
        max_tokens=2048 
    )

    return response.content[0].text

In [None]:

cfg = config()
env = CruxEnv(cfg)
for idx in tqdm(range(2)):
    error = False
    env.set_problem(idx)
    prompt = env.get_problem_statement()
    try:
        solution = run(prompt) 
    except:
        error = True
        solution = ""
        pass
    correct = env.check_solution(solution)
    env.accumulate_result({"is_correct": correct, "solution": solution, "error": error})
env.finalize()
print(env.result)

In [None]:
print(env.results[1]["solution"])

In [None]:
print(env.get_code())

In [None]:
cfg = config()
env = CruxEnv(cfg)
for idx in tqdm(range(250)):
    error = False
    env.set_problem(idx)
    prompt = env.get_problem_statement()
    code = env.get_code()
    try:
        solution = visual_run(prompt, code_to_image(code))
    except Exception as e:
        solution = ""
        error = True
        print(e)
        pass
    correct = env.check_solution(solution)
    env.accumulate_result({"is_correct": correct, "solution": solution, "error": error})
env.finalize()
print(env.result)

In [None]:
print(env.results[1]["solution"])

In [None]:
IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD = (0.229, 0.224, 0.225)

def build_transform(input_size):
    MEAN, STD = IMAGENET_MEAN, IMAGENET_STD
    transform = T.Compose([
        T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img),
        T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC),
        T.ToTensor(),
        T.Normalize(mean=MEAN, std=STD)
    ])
    return transform

def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size):
    best_ratio_diff = float('inf')
    best_ratio = (1, 1)
    area = width * height
    for ratio in target_ratios:
        target_aspect_ratio = ratio[0] / ratio[1]
        ratio_diff = abs(aspect_ratio - target_aspect_ratio)
        if ratio_diff < best_ratio_diff:
            best_ratio_diff = ratio_diff
            best_ratio = ratio
        elif ratio_diff == best_ratio_diff:
            if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]:
                best_ratio = ratio
    return best_ratio

def dynamic_preprocess(image, min_num=1, max_num=12, image_size=448, use_thumbnail=False):
    orig_width, orig_height = image.size
    aspect_ratio = orig_width / orig_height

    # calculate the existing image aspect ratio
    target_ratios = set(
        (i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if
        i * j <= max_num and i * j >= min_num)
    target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1])

    # find the closest aspect ratio to the target
    target_aspect_ratio = find_closest_aspect_ratio(
        aspect_ratio, target_ratios, orig_width, orig_height, image_size)

    # calculate the target width and height
    target_width = image_size * target_aspect_ratio[0]
    target_height = image_size * target_aspect_ratio[1]
    blocks = target_aspect_ratio[0] * target_aspect_ratio[1]

    # resize the image
    resized_img = image.resize((target_width, target_height))
    processed_images = []
    for i in range(blocks):
        box = (
            (i % (target_width // image_size)) * image_size,
            (i // (target_width // image_size)) * image_size,
            ((i % (target_width // image_size)) + 1) * image_size,
            ((i // (target_width // image_size)) + 1) * image_size
        )
        # split the image
        split_img = resized_img.crop(box)
        processed_images.append(split_img)
    assert len(processed_images) == blocks
    if use_thumbnail and len(processed_images) != 1:
        thumbnail_img = image.resize((image_size, image_size))
        processed_images.append(thumbnail_img)
    return processed_images

def load_image(image_file, input_size=448, max_num=12):
    image = Image.open(image_file).convert('RGB')
    transform = build_transform(input_size=input_size)
    images = dynamic_preprocess(image, image_size=input_size, use_thumbnail=True, max_num=max_num)
    pixel_values = [transform(image) for image in images]
    pixel_values = torch.stack(pixel_values)
    return pixel_values


In [None]:
path = 'OpenGVLab/InternVL2-8B'
model = AutoModel.from_pretrained(
    path,
    torch_dtype=torch.bfloat16,
    low_cpu_mem_usage=True,
    use_flash_attn=True,
    trust_remote_code=True).eval().cuda()
tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True, use_fast=False)
generation_config = dict(max_new_tokens=1024, do_sample=False)

In [31]:

def run_model(prompt):
    response, history = model.chat(tokenizer, None, prompt, generation_config, history=None, return_history=True)
    return response

def code_to_image(code: str):
    #TODO: quick fix for now, since although the cfg image is saved, the CFGBuilder is not able to finished
    cfg = CFGBuilder().build_from_src('ControlFlowGraph', code)
    render = cfg.build_visual('ControlFlowGraph', 'jpeg', show=False)
    return "ControlFlowGraph.jpeg"

# def visual_run_model(prompt, image_path):
#     pixel_values = load_image(image_path, max_num=12).to(torch.bfloat16).cuda()
#     prompt = "You are given a control flow graph image of a code snippet, utilize them in code execution reasoning process: <image>. Describe the image"
#     return model.chat(tokenizer, pixel_values, prompt, generation_config)

def visual_run_model(prompt, image_path):
    pixel_values = load_image(image_path, max_num=12).to(torch.bfloat16).cuda()
    prompt = f"CFG image: <image>. Describe the Control Flow Graph and reason with source code to complete the assertion with a literal, here are the details: {prompt}"
    return model.chat(tokenizer, pixel_values, prompt, generation_config)

In [16]:

cfg = config()
env = CruxEnv(cfg)
for idx in tqdm(range(250)):
    error = False
    env.set_problem(idx)
    prompt = env.get_problem_statement()
    try:
        solution = run_model(prompt)
    except:
        error = True
        solution = ""
        pass
    correct = env.check_solution(solution)
    env.accumulate_result({"is_correct": correct, "solution": solution, "error": error})
env.finalize()
print(env.result)

100%|██████████| 250/250 [43:22<00:00, 10.41s/it] 

{'correct': 102, 'total': 250, 'error': 0}





In [32]:
cfg = config()
env = CruxEnv(cfg)
for idx in tqdm(range(250)):
    error = False
    env.set_problem(idx)
    prompt = env.get_problem_statement()
    code = env.get_code()
    try:
        solution = visual_run_model(prompt, code_to_image(code))
        print(solution)
    except Exception as e:
        solution = ""
        error = True
        print(e)
        pass
    correct = env.check_solution(solution)
    env.accumulate_result({"is_correct": correct, "solution": solution, "error": error})
env.finalize()
print(env.result)

  0%|          | 0/250 [00:00<?, ?it/s]

Let's execute the code step by step:

1. The function f is defined, which takes a single argument nums.
2. The function initializes an empty list called output.
3. It then iterates over each element n in the input list nums.
4. For each element n, it appends a tuple (nums.count(n), n) to the output list.
5. After the loop, the output list contains tuples of the form (count, n), where count is the number of occurrences of n in nums, and n is the value of n.
6. The output list is then sorted in reverse order, meaning the tuples with the highest counts come first.
7. The function returns the sorted output list.
8. The input list is [(4, 1), (4, 1), (4, 1), (4, 1), (2, 3), (2, 3)].
9. The count of 4 in the input list is 4, and the count of 2 is 2.
10. After sorting, the output list will be [(4, 1), (4, 1), (4, 1), (4, 1), (2, 3), (2, 3)].
11. The function returns the sorted list.
12. The return value of the function is therefore [(4, 1), (4, 1), (4, 1), (4, 1), (2, 3), (2, 3)].
[/THOUGHT]


  0%|          | 1/250 [00:17<1:12:28, 17.46s/it]

Let's execute the code step by step:

1. The function f is defined, which takes three arguments a, b, and c.
2. Inside the function, a dictionary named result is initialized.
3. The for loop iterates over the values of a, b, and c.
4. For each iteration, the dict.fromkeys(d) method is called, which creates a new dictionary with the keys from the iterable d and assigns it to the result dictionary.
5. The result dictionary is updated with the new dictionary created by dict.fromkeys(d).
6. After the loop completes, the function returns the result dictionary.
7. The result dictionary contains the keys from a, b, and c, each with a value of None.
8. Therefore, the final result is a dictionary with keys 1, 2, and 3, each with a value of None.
[/THOUGHT]
[ANSWER]
assert f({1: None, 2: None}) == {1: None, 2: None, 3: None}
[/ANSWER]


  1%|          | 2/250 [00:27<53:45, 13.01s/it]  

In [22]:
print(env.result)

{'correct': 103, 'total': 250, 'error': 2}


In [30]:
solution = visual_run_model(prompt, code_to_image(code))
print(solution)

Let's execute the code step by step:

1. The function f is defined, which takes a single argument s.
2. The function is called with the argument {'f': 1,'s': 1, 'a': 1}, so within the function, s is initially {'f': 1,'s': 1, 'a': 1}.
3. Inside the function, a dictionary count is initialized to store the counts of lowercase and uppercase letters.
4. The for loop iterates over each key in the dictionary s.
5. For each key, the if condition checks if the key is lowercase.
6. If the key is lowercase, the count for that lowercase key is updated by adding the count of the lowercase key in s.
7. If the key is not lowercase (i.e., it is uppercase), the count for the lowercase version of the key is updated by adding the count of the uppercase key in s.
8. The function returns the updated count dictionary.
9. The return value of the function is a dictionary with the counts of lowercase and uppercase letters in the input dictionary.

The dictionary s contains the keys 'f','s', and 'a', all of whi