### setup

In [1]:
import gzip
import json
from typing import List
from collections import Counter, defaultdict
from tqdm import tqdm
import os
import inspect_ai
from openai import OpenAI
from transformers import AutoTokenizer, AutoModelForCausalLM
from huggingface_hub import login
from dotenv import load_dotenv
import torch as t
import subprocess
import contextlib
import shutil
import ast
import textwrap

In [2]:
from inspect_ai import Task, task
from inspect_ai.dataset import Sample, hf_dataset
from inspect_ai.util import ExecResult, sandbox
from inspect_ai.scorer import CORRECT, INCORRECT, Score, Scorer, Target, accuracy, scorer, stderr
from inspect_ai.solver import TaskState, generate
from inspect_ai.model import get_model
from inspect_ai.log import read_eval_log
import re

In [3]:
load_dotenv()
login(token = os.environ['HF_TOKEN'])

Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


In [4]:
IMPORT_HELPER = {
    "python": [
        "import math",
        "import re",
        "import sys",
        "import copy",
        "import datetime",
        "import itertools",
        "import collections",
        "import heapq",
        "import statistics",
        "import functools",
        "import hashlib",
        "import numpy",
        "import numpy as np",
        "import string",
        "from typing import *",
        "from collections import *",
    ],
    "go"    : [
        "math",
        "strings",
        "fmt",
        "strconv",
        "time",
        "bytes",
        "regexp",
        "sort",
        "math/rand",
        "crypto/md5",
    ],
    "cpp"   : [
        "#include<stdlib.h>",
        "#include<algorithm>",
        "#include<math.h>",
        "#include<stdio.h>",
        "#include<vector>",
        "#include<string>",
        "#include<climits>",
        "#include<cstring>",
        "#include<iostream>",
    ],
}

# instruction prepended to code problem
INSTRUCTION = """
Read the following function signature and docstring, and fully implement
the function described. Your response should only contain the code for
this function.\n
"""

LANG_PREFIX = {
    "cpp"          : "// language: C++",
    "java"         : "// language: Java",
    "js"           : "// language: JavaScript",
    "javascript"   : "// language: JavaScript",
    "go"           : "// language: Go",
    "python"       : "# language: Python",
}

In [5]:
model = get_model(
        'hf/meta-llama/Llama-3.1-8B-Instruct', 
        device = 'auto',
        torch_dtype=t.bfloat16,
)

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

### Data Processing

In [6]:
def stream_jsonl_all(filename: str):
    results = []
    fp = gzip.open(open(filename, "rb"), "rt")
    for line in fp:
        if any(not x.isspace() for x in line):
            results.append(json.loads(line))
    fp.close()

    return results

In [7]:
python_content = stream_jsonl_all('data/python_data.gz')
cpp_content = stream_jsonl_all('data/cpp_data.gz')
go_content = stream_jsonl_all('data/go_data.gz')
java_content = stream_jsonl_all('data/java_data.gz')
js_content = stream_jsonl_all('data/js_data.gz')
content = [python_content, cpp_content, go_content, java_content, js_content]

In [8]:
generations = stream_jsonl_all('data/python_generations.gz')
generations[0]['generation']

'    for idx, elem in enumerate(numbers):\n        for idx2, elem2 in enumerate(numbers):\n            if idx != idx2:\n                distance = abs(elem - elem2)\n                if distance < threshold:\n                    return True\n\n    return False\n'

In [9]:
for lang in content:
    print(lang[0].keys())
    print()

dict_keys(['task_id', 'prompt', 'canonical_solution', 'test', 'text', 'declaration', 'example_test'])

dict_keys(['task_id', 'prompt', 'canonical_solution', 'test', 'declaration', 'example_test'])

dict_keys(['task_id', 'prompt', 'import', 'docstring', 'declaration', 'canonical_solution', 'test', 'test_setup', 'example_test'])

dict_keys(['task_id', 'prompt', 'canonical_solution', 'test', 'text', 'declaration', 'example_test'])

dict_keys(['task_id', 'prompt', 'canonical_solution', 'test', 'declaration', 'example_test'])



### LLM output => code  
*I need to see LLM output & transform that into code*

In [62]:
# FIND CODE

def find_code_new(completion: str) -> str:
    pattern_1 = re.compile(r"```python\n(.*?)```", re.DOTALL)
    pattern_2 = re.compile(r"```\n(.*?)```", re.DOTALL)
    matches = pattern_1.findall(completion) + pattern_2.findall(completion)
    if matches:
        extracted_answer = matches[0]
        extracted_answer = extract_function_body_new(extracted_answer)
    else:
        extracted_answer = completion
    return str(extracted_answer)


def extract_function_body_new(code: str) -> str:
    try:
        tree = ast.parse(code)
        for node in tree.body:
            if not isinstance(node, ast.FunctionDef):
                continue
            code_lines = code.splitlines()
            start = node.body[0].lineno - 1
            end = node.body[-1].end_lineno
            body_lines = code_lines[start:end]
            return "\n".join(body_lines)

    except Exception as e:
        print(f"Error extracting function body: {e}")
        return "errormsg"

In [65]:

a = 8
int(round(a ** (1. / 3))) ** 3 == a

True

In [None]:
@scorer(metrics=[accuracy(), stderr()])
def python_scorer() -> Scorer:
    async def score(state: TaskState, target: Target) -> Score:
        idx = state.sample_id
        model_completion = state.output.completion
        processed_completion = find_code_new(model_completion)
        final_code = ''.join([
            state.metadata["prompt"],
            processed_completion,
            "\n",
            state.metadata["test"],
        ])

        if 'errormsg' in processed_completion:
            print(f'error in sample: {idx}')

        try:
            result = await sandbox().exec(
                cmd=["python", "-c", final_code],
                timeout=30,
            )
        except TimeoutError:
            result = ExecResult(False, 1, "", "Verification timed out.")

        return Score(
            value=CORRECT if result.success else INCORRECT,
            explanation="".join(
                ["The following verification code was executed:\n\n"]
                + [final_code]
                + [f"\n\nThe submission was incorrect\n\n{result.stderr}"]
                if not result.success
                else [""]
            ),
            metadata={
                'completion': model_completion,
                'processed': processed_completion,
                'final_code': final_code,
            }
        )

    return score


In [12]:
lang = 'python'

def humaneval_record_to_sample(record):
    model_input = INSTRUCTION + LANG_PREFIX[lang] + '\n' + record['prompt'] 
    
    return Sample(
        id=record["task_id"],
        input=model_input,
        target=record["canonical_solution"],
        metadata={
            "prompt": record["prompt"],
            "test": record["test"],
        },
    )

humaneval_dataset = hf_dataset(
    path = 'THUDM/humaneval-x',
    name = lang,
    split = 'test',
    sample_fields = humaneval_record_to_sample,
    trust = True,
)

Loading dataset THUDM/humaneval-x from Hugging Face...


0000.parquet:   0%|          | 0.00/105k [00:00<?, ?B/s]

Generating test split:   0%|          | 0/164 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/164 [00:00<?, ? examples/s]

In [14]:
samples = 164

@task
def humaneval():
    return Task(
        dataset = humaneval_dataset[:samples],
        solver = generate(),
        scorer = python_scorer(),
        sandbox = 'local',
    )

In [15]:
epochs = 1
inspect_ai.eval(humaneval(), model = 'openai/gpt-4o-mini', epochs = epochs)

Output()

In [None]:
idx = 0

python_gpt_log = read_eval_log('/root/srf-project/test_humaneval-x/logs/python_gpt4o-mini.eval')
model_out = python_gpt_log.samples[idx].messages[-1].content

print(python_gpt_log.samples[idx].id)
print()
print(model_out)

In [60]:
target_id = 89
idx = 0
model_out = ''

for i, sample in enumerate(python_gpt_log.samples):
    id = int(sample.id.split('/')[-1])
    if id == target_id:
        idx = i
        model_out = sample.messages[-1].content
        print(i)
        print()
        print(model_out)
        break


152

```python
def encrypt(s):
    """Create a function encrypt that takes a string as an argument and
    returns a string encrypted with the alphabet being rotated. 
    The alphabet should be rotated in a manner such that the letters 
    shift down by two multiplied to two places.
    For example:
    encrypt('hi') returns 'lm'
    encrypt('asdfghjkl') returns 'ewhjklnop'
    encrypt('gf') returns 'kj'
    encrypt('et') returns 'ix'
    """
    result = []
    for char in s:
        if char.isalpha():  # Process only alphabetic characters
            # Shift the character by 4 places in the alphabet
            new_char = chr(((ord(char) - ord('a') + 4) % 26) + ord('a')) if char.islower() else \
                           ((ord(char) - ord('A') + 4) % 26) + ord('A'))
            result.append(new_char)
    return ''.join(result)
```


In [61]:
print(find_code_new(model_out))

Error extracting function body: unmatched ')' (<unknown>, line 17)
error msg


#### Code Extraction Tests

In [31]:
idx = 0

js_eval = read_eval_log('logs/java_1.eval')
model_out = js_eval.samples[idx].messages[-1].content
print(model_out)

# print(data['completion'])
# print(data['processed'])
# print(data['final_code'])

# find_code_new(data['completion'])

```java
import java.util.*;

public class Solution {
    /**
     * Check if in given list of numbers, are any two numbers closer to each other than given threshold.
     *
     * @param numbers A list of numbers to check
     * @param threshold The minimum distance between two numbers to be considered as close
     * @return True if any two numbers are closer than the threshold, False otherwise
     */
    public boolean hasCloseElements(List<Double> numbers, double threshold) {
        if (numbers.size() < 2) {
            return false;
        }

        for (int i = 0; i < numbers.size() - 1; i++) {
            for (int j = i + 1; j < numbers.size(); j++) {
                if (Math.abs(numbers.get(i) - numbers.get(j)) < threshold) {
                    return true;
                }
            }
        }

        return false;
    }
}
```


In [158]:
print(js_content[0]['test'])

const testHasCloseElements = () => {
  console.assert(hasCloseElements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) === true)
  console.assert(
    hasCloseElements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) === false
  )
  console.assert(hasCloseElements([1.0, 2.0, 5.9, 4.0, 5.0], 0.95) === true)
  console.assert(hasCloseElements([1.0, 2.0, 5.9, 4.0, 5.0], 0.8) === false)
  console.assert(hasCloseElements([1.0, 2.0, 3.0, 4.0, 5.0, 2.0], 0.1) === true)
  console.assert(hasCloseElements([1.1, 2.2, 3.1, 4.1, 5.1], 1.0) === true)
  console.assert(hasCloseElements([1.1, 2.2, 3.1, 4.1, 5.1], 0.5) === false)
}

testHasCloseElements()



In [106]:
print(js_eval.samples[0].input)


Read the following function signature and docstring, and fully implement
the function described. Your response should only contain the code for
this function.

// language: JavaScript
/* Check if in given list of numbers, are any two numbers closer to each other than
  given threshold.
  >>> hasCloseElements([1.0, 2.0, 3.0], 0.5)
  false
  >>> hasCloseElements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
  true
  */
const hasCloseElements = (numbers, threshold) => {



In [None]:
# preprocess – get 5 evals files!
num_eval_logs = 5
eval_logs = []
for i in range(num_eval_logs):
    file_name = f'logs/test_{i}.eval'
    eval_logs.append(read_eval_log(file_name))

samples_read = 164
for i in range(samples_read):
    processed_results = []
    for j in range(num_eval_logs):
        data = eval_logs[0].samples[i].scores['verify'].metadata
        processed_results.append(data['processed'])
    
    assert len(set(processed_results)) == 1

# if this works, easy to scale it up to 164 samples.