In [2]:
import itertools
from multiprocessing import Array, Value
from typing import Any, Dict, List, Tuple, Union

import json

import numpy as np

from evalplus.eval._special_oracle import (
    MBPP_OUTPUT_NOT_NONE_TASKS,
    MBPP_OUTPUT_SET_EQ_TASKS,
    _poly,
)
from evalplus.eval.utils import (
    create_tempdir,
    reliability_guard,
    swallow_io,
    time_limit,
)

import argparse
import json
import multiprocessing
import os
import pickle
import threading
import time
from collections import Counter, defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
from typing import Any, Dict, List, Tuple
from warnings import warn

import numpy as np
from termcolor import cprint
from tqdm import tqdm

from evalplus.data import (
    get_human_eval_plus,
    get_human_eval_plus_hash,
    get_mbpp_plus,
    get_mbpp_plus_hash,
    load_solutions,
)
from evalplus.data.mbpp import mbpp_serialize_inputs
from evalplus.data.utils import CACHE_DIR
from evalplus.eval import (
    FAIL,
    PASS,
    compatible_eval_result,
    estimate_pass_at_k,
)

from evalplus.gen.util import trusted_exec
from evalplus.eval.utils import TimeoutException

PASS = "pass"
FAIL = "fail"
TIMEOUT = "timeout"

_SUCCESS = 0
_FAILED = 1
_TIMEOUT = 2
_UNKNOWN = 3

class MyCustomException(BaseException):
    def __init__(self, message):
        self.message = message


def unsafe_execute_with_outputs(
    dataset: str,
    entry_point: str,
    code: str,
    task_id: str,
    solution_id: str,
    inputs,
    expected: List,
    time_limits,
    stat: str,
    details: List[int],
    outputs: Dict[str, Any],
):
    with create_tempdir():
        # These system calls are needed when cleaning up tempdir.
        import os
        import shutil

        rmtree = shutil.rmtree
        rmdir = os.rmdir
        chdir = os.chdir
        # Disable functionalities that can make destructive changes to the test.
        # allow only 4GB memory usage
        maximum_memory_bytes = 4 * 1024 * 1024 * 1024
        reliability_guard(maximum_memory_bytes=maximum_memory_bytes)
        exec_globals = {}
        try:
            with swallow_io():
                exec(code, exec_globals)
                fn = exec_globals[entry_point]
            for i, inp in enumerate(inputs):
                if i < len(details):
                    if details[i] == 1:
                        outputs[i] = expected[i]
                        continue
                elif i >= len(details):
                    continue
                try:
                    with time_limit(time_limits[i]):
                        with swallow_io():
                            out = fn(*inp)
                        
                    if dataset == "mbpp":
                        if entry_point in MBPP_OUTPUT_SET_EQ_TASKS:
                            out = set(out)
                        elif entry_point in MBPP_OUTPUT_NOT_NONE_TASKS:
                            if not isinstance(out, bool):
                                out = out is not None
                    outputs[i] = out
                except TimeoutException as e:
                    outputs[i] = f"failed: Execution timed out."
                except BaseException as e:
                    outputs[i] = f"failed: {e}"
                    continue
        except BaseException as e:
            for i in range(len(inputs)):
                if i not in outputs:
                    outputs[i] = f"failed: {e}"
                    if outputs[i].strip() == "failed:":
                        outputs[i] = f"failed: An error occurred."
        shutil.rmtree = rmtree
        os.rmdir = rmdir
        os.chdir = chdir

def get_groundtruth(problems, hashcode, tasks_only_output_not_none):
    cache_file = os.path.join(CACHE_DIR, f"{hashcode}.pkl")
    if os.path.exists(cache_file):
        #print(f"Load from ground-truth from {cache_file}")
        with open(cache_file, "rb") as f:
            return pickle.load(f)

    os.makedirs(CACHE_DIR, exist_ok=True)
    #print("Computing expected output...")
    tbegin = time.time()
    expected_output = {}
    for task_id, problem in problems.items():
        oracle = {}
        oracle["base"], oracle["base_time"] = trusted_exec(
            problem["prompt"] + problem["canonical_solution"],
            problem["base_input"],
            problem["entry_point"],
            record_time=True,
            output_not_none=problem["entry_point"] in tasks_only_output_not_none,
        )

        oracle["plus"], oracle["plus_time"] = trusted_exec(
            problem["prompt"] + problem["canonical_solution"],
            problem["plus_input"],
            problem["entry_point"],
            record_time=True,
            output_not_none=problem["entry_point"] in tasks_only_output_not_none,
        )
        expected_output[task_id] = oracle
    #print(f"Expected outputs computed in {time.time() - tbegin:.2f}s")

    with open(cache_file, "wb") as f:
        pickle.dump(expected_output, f)

    return expected_output

def untrusted_check(
    dataset: str,
    entry_point: str,
    code: str,
    task_id: str,
    solution_id: str,
    inputs: List[Any],
    expected: List[Any],
    ref_time,
    stat,
    details,
):
    min_time_limit=1.0
    gt_time_limit_factor=4.0
    fast_check = False
    time_limits = [max(min_time_limit, gt_time_limit_factor * t) for t in ref_time]
    timeout = min(os.getenv("EVALPLUS_TIMEOUT_PER_TASK", 60), sum(time_limits)) + 1
    if not fast_check:
        timeout += 1  # extra time for data collection
    manager = multiprocessing.Manager()
    outputs = manager.dict()

    p = multiprocessing.Process(
        target=unsafe_execute_with_outputs,
        #target=unsafe_execute,
        args=(
            dataset,
            entry_point,
            code,
            task_id,
            solution_id,
            inputs,
            expected,
            time_limits,
            stat,
            details,
            outputs,
        ),
    )
    p.start()
    p.join(timeout=timeout + 1)
    if p.is_alive():
        p.terminate()
        time.sleep(0.1)
    if p.is_alive():
        p.kill()
        time.sleep(0.1)

    return outputs.copy()


In [4]:
problems = get_mbpp_plus()
dataset_hash = get_mbpp_plus_hash()
expected_output = get_groundtruth(
    problems,
    dataset_hash,
    MBPP_OUTPUT_NOT_NONE_TASKS,
)

In [8]:
'''
{"text": "Write a function to find the number of ways to fill it with 2 x 1 dominoes for the given 3 x n board.", 
 "code": "def count_ways(n): \r\n\tA = [0] * (n + 1) \r\n\tB = [0] * (n + 1) \r\n\tA[0] = 1\r\n\tA[1] = 0\r\n\tB[0] = 0\r\n\tB[1] = 1\r\n\tfor i in range(2, n+1): \r\n\t\tA[i] = A[i - 2] + 2 * B[i - 1] \r\n\t\tB[i] = A[i - 1] + B[i - 2] \r\n\treturn A[n] ", 
 "task_id": 5, 
 "test_setup_code": "", 
 "test_list": ["assert count_ways(2) == 3", "assert count_ways(8) == 153", "assert count_ways(12) == 2131"], 
 "challenge_test_list": []}
'''
code = """def count_ways(n):
    if n == 0:
        return 1
    if n == 1:
        return 1
    if n == 2:
        return 3
    return count_ways(n-1) + count_ways(n-2)"""
inputs = [[2], [8], [12]]
expected = [3, 153, 2131]
untrusted_check(
    dataset="mbpp",
    entry_point="count_ways",
    code=code,
    task_id="Mbpp/5",
    solution_id="0",
    inputs=inputs,
    expected=expected,
    ref_time=[0.0, 0.0, 0.0],
    stat="base",
    details=[0, 0, 0],)


{0: 3, 1: 47, 2: 322}

In [21]:
code = """def differ_At_One_Bit_Pos(lhs,rhs):
    if (lhs - rhs) == 0 or (lhs - rhs) == 1:
        return True
    return False"""
task_id = "Mbpp/6"
entry_point = problems[task_id]["entry_point"]
inputs = problems[task_id]["base_input"]
expected = expected_output[task_id]["base"]
expected_time = expected_output[task_id]["base_time"]
print([tuple(inp) for inp in inputs])
untrusted_check(
    dataset="mbpp",
    entry_point=entry_point,
    code=code,
    task_id=task_id,
    solution_id="0",
    inputs=inputs,
    expected=expected,
    ref_time=expected_time,
    stat="base",
    details=[0]*len(inputs),)


[(13, 9), (15, 8), (2, 4), (2, 3), (5, 1), (1, 5)]


{0: False, 1: False, 2: False, 3: False, 4: False, 5: False}

In [29]:
code = """def find_char_long(text):
  return (re.findall(r"\b\w{4,}\b", text))"""
task_id = "Mbpp/7"
entry_point = problems[task_id]["entry_point"]
inputs = problems[task_id]["base_input"]
expected = expected_output[task_id]["base"]
expected_time = expected_output[task_id]["base_time"]
print([tuple(inp) for inp in inputs])
untrusted_check(
    dataset="mbpp",
    entry_point=entry_point,
    code=code,
    task_id=task_id,
    solution_id="0",
    inputs=inputs,
    expected=expected,
    ref_time=expected_time,
    stat="base",
    details=[0]*len(inputs),)


[('Please move back to stream',), ('Jing Eco and Tech',), ('Jhingai wulu road Zone 3',)]


{0: "failed: name 're' is not defined",
 1: "failed: name 're' is not defined",
 2: "failed: name 're' is not defined"}

In [31]:
code = """def square_nums(nums):
  square_nums = list(map(lambda x: x ** 2, nums))
  return square_nums"""
task_id = "Mbpp/8"
entry_point = problems[task_id]["entry_point"]
inputs = problems[task_id]["base_input"]
expected = expected_output[task_id]["base"]
expected_time = expected_output[task_id]["base_time"]
print([tuple(inp) for inp in inputs])
untrusted_check(
    dataset="mbpp",
    entry_point=entry_point,
    code=code,
    task_id=task_id,
    solution_id="0",
    inputs=inputs,
    expected=expected,
    ref_time=expected_time,
    stat="base",
    details=[0]*len(inputs),)

[([1, 2, 3, 4, 5, 6, 7, 8, 9, 10],), ([10, 20, 30],), ([12, 15],)]


{0: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100], 1: [100, 400, 900], 2: [144, 225]}

In [34]:
code = """def find_Rotations(s):
    if len(s) == 1:
        return 1
    elif len(s) == 0:
        return 0
    else:
        l = len(s)
        min = len(s)
        for i, c in enumerate(s):
            if c == s[0]:
                temp = i
                if temp < min:
                    min = temp
        return min"""
task_id = "Mbpp/9"
entry_point = problems[task_id]["entry_point"]
inputs = problems[task_id]["base_input"]
expected = expected_output[task_id]["base"]
expected_time = expected_output[task_id]["base_time"]
print([tuple(inp) for inp in inputs])
print(expected)
untrusted_check(
    dataset="mbpp",
    entry_point=entry_point,
    code=code,
    task_id=task_id,
    solution_id="0",
    inputs=inputs,
    expected=expected,
    ref_time=expected_time,
    stat="base",
    details=[0]*len(inputs),)

[('aaaa',), ('ab',), ('abc',)]
[1, 2, 3]


{0: 0, 1: 0, 2: 0}

In [36]:
{"text": "Write a function to get the n smallest items from a dataset.", 
 "code": "import heapq\r\ndef small_nnum(list1,n):\r\n  smallest=heapq.nsmallest(n,list1)\r\n  return smallest", 
 "task_id": 10, 
 "test_setup_code": "", 
 "test_list": ["assert small_nnum([10, 20, 50, 70, 90, 20, 50, 40, 60, 80, 100],2)==[10,20]", 
               "assert small_nnum([10, 20, 50, 70, 90, 20, 50, 40, 60, 80, 100],5)==[10,20,20,40,50]", 
               "assert small_nnum([10, 20, 50, 70, 90, 20, 50, 40, 60, 80, 100],3)==[10,20,20]"], 
 "challenge_test_list": []}

code = """def small_nnum(lst,n):
  lst = sorted(lst)
  lst = lst[:n]
  return lst"""
inputs = [[[10, 20, 50, 70, 90, 20, 50, 40, 60, 80, 100],2], 
          [[10, 20, 50, 70, 90, 20, 50, 40, 60, 80, 100],5], 
          [[10, 20, 50, 70, 90, 20, 50, 40, 60, 80, 100],3]]
expected = [[10,20], [10,20,20,40,50], [10,20,20]]
print([tuple(inp) for inp in inputs])
print(expected)
untrusted_check(
    dataset="mbpp",
    entry_point="small_nnum",
    code=code,
    task_id="Mbpp/10",
    solution_id="0",
    inputs=inputs,
    expected=expected,
    ref_time=[0.0, 0.0, 0.0],
    stat="base",
    details=[0, 0, 0],)

[([10, 20, 50, 70, 90, 20, 50, 40, 60, 80, 100], 2), ([10, 20, 50, 70, 90, 20, 50, 40, 60, 80, 100], 5), ([10, 20, 50, 70, 90, 20, 50, 40, 60, 80, 100], 3)]
[[10, 20], [10, 20, 20, 40, 50], [10, 20, 20]]


{0: [10, 20], 1: [10, 20, 20, 40, 50], 2: [10, 20, 20]}