In [3]:
from transformers import AutoModelForCausalLM, AutoTokenizer
import logging
import sys
import time, datetime
from typing import Dict, Iterable
import os
import gzip
import json

from google.colab import drive

# Mount Google Drive to the current Colab session
drive.mount('/content/drive')

# Verify that your drive is accessible
!ls /content/drive/MyDrive

Mounted at /content/drive
'16 б.gdoc'			  deleteme.zip		   share
'6 фото.     5 лет блондин.JPG'   futuristic-office.jpeg  'Tanya Passport.JPG'
'Colab Notebooks'		  rental_houses.gsheet	  'лечение травами 3.gdoc'


In [4]:
from huggingface_hub import notebook_login
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [5]:
def get_import_statements(code_str: str) -> str:
    """
    Keeps lines that begin with 'import' or 'from' from a multiline string.
    """
    lines = code_str.splitlines()
    filtered_lines = []
    for line in lines:
        stripped_line = line.strip()
        if stripped_line.startswith('import ') or stripped_line.startswith('from '):
            filtered_lines.append(line)
    res = '\n'.join(filtered_lines) + '\n'
    s = 'from typing import List'
    if s not in res:
        res = s + '\n' + res
    return res

# Example usage
code_with_imports = '''import collections
import random
import string

def task_func(length=100):
    return 1
'''
print( get_import_statements(code_with_imports) )

from typing import List
import collections
import random
import string



In [6]:
def write_jsonl(filename: str, data: Iterable[Dict], append: bool = False):
    """
    Writes an iterable of dictionaries to jsonl
    """
    if append:
        mode = 'ab'
    else:
        mode = 'wb'
    filename = os.path.expanduser(filename)
    if filename.endswith(".gz"):
        with open(filename, mode) as fp:
            with gzip.GzipFile(fileobj=fp, mode='wb') as gzfp:
                for x in data:
                    gzfp.write((json.dumps(x) + "\n").encode('utf-8'))
    else:
        with open(filename, mode) as fp:
            for x in data:
                fp.write((json.dumps(x) + "\n").encode('utf-8'))


def stream_jsonl(filename: str) -> Iterable[Dict]:
    """
    Parses each jsonl line and yields it as a dictionary
    """
    if filename.endswith(".gz"):
        with open(filename, "rb") as gzfp:
            with gzip.open(gzfp, 'rt') as fp:
                for line in fp:
                    if any(not x.isspace() for x in line):
                        yield json.loads(line)
    else:
        with open(filename, "r") as fp:
            for line in fp:
                if any(not x.isspace() for x in line):
                    yield json.loads(line)


def read_problems(evalset_file: str) -> Dict[str, Dict]:
    return {task["task_id"]: task for task in stream_jsonl(evalset_file)}

In [7]:
def is_full_function(input_str: str) -> str:
    '''
    Determine if input string is a Python function
    using simple heuristics (all that's needed)
    '''
    lines = input_str.splitlines()
    for line in lines:
        if line.startswith('def '):
            return True
    return False


def remove_triple_backticks_at_edges(text: str) -> str:
    """
    Removes triple backticks (` ``` `) if they appear at the beginning or the end of the text.

    :param text: The input text as a string.
    :return: A string with triple backticks removed from the edges.
    """
    s1 = "```python\n"
    s2 = "```python"
    s3 = "```"
    for s in [s1, s2, s3]:
        if text.startswith(s):
            text = text[len(s):].lstrip()      # Remove a version of backticks + extra whitespace at the start
    if text.endswith(s3):
        text = text[: -len(s3)].rstrip()       # Remove the backticks + extra whitespace at the end
    return text


def add_import(text: str) -> str:
    ''' Adds import statements stripped by LLM '''
    text = text.strip()
    s = 'from typing import List'
    if 'def ' in text and s not in text:
        text = s + '\n\n' + text
    return text

string = 'def my_func(input):\n    return output'
print(add_import(string))

from typing import List

def my_func(input):
    return output


In [8]:
# read dataset
file = '/content/drive/My Drive/Colab Notebooks/data/Big_Code_Bench_Test.jsonl.gz'
tasks = read_problems(file)
print(f'Type of tasks: {type(tasks)}\n')

counter = 0
for k,v in tasks.items():
    if counter == 5:
        break
    print(f'task_id: {k}', type(k))
    for k2, v2 in v.items():
        print(f'\n{k2}:\n{v2}')
    print('\n' + '='*100 + '\n')
    counter += 1

Type of tasks: <class 'dict'>

task_id: BigCodeBench/0 <class 'str'>

task_id:
BigCodeBench/0

test:


import unittest
from unittest.mock import patch
from random import seed, shuffle
import itertools
class TestCases(unittest.TestCase):
    def test_default_numbers(self):
        # Test with default number range (1 to 10) to check that the result is a positive float.
        result = task_func()
        self.assertIsInstance(result, float)
        self.assertGreater(result, 0)
    def test_custom_list(self):
        # Test with a custom list of small positive integers to ensure proper handling and positive result.
        result = task_func([1, 2, 3])
        self.assertIsInstance(result, float)
        self.assertGreater(result, 0)
    def test_negative_numbers(self):
        # Test with negative numbers to verify the function handles and returns a positive result.
        result = task_func([-3, -2, -1])
        self.assertIsInstance(result, float)
        self.assertGreater(result, 

In [11]:
access_token = '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'   # hard-coded HuggingFace access token
model_id = 'meta-llama/Meta-Llama-3.1-8B-Instruct'
tokenizer = AutoTokenizer.from_pretrained(model_id, token=access_token)
model = AutoModelForCausalLM.from_pretrained(model_id, device_map = 'cuda', token=access_token)

if tokenizer.pad_token is None:
  tokenizer.pad_token = tokenizer.eos_token

tokenizer_config.json:   0%|          | 0.00/55.4k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.09M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/296 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/855 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/23.9k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/4 [00:00<?, ?it/s]

model-00001-of-00004.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00002-of-00004.safetensors:   0%|          | 0.00/5.00G [00:00<?, ?B/s]

model-00003-of-00004.safetensors:   0%|          | 0.00/4.92G [00:00<?, ?B/s]

model-00004-of-00004.safetensors:   0%|          | 0.00/1.17G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/184 [00:00<?, ?B/s]

In [13]:
print(f'Model device: {model.device}')
print(f'Model temperature: {model.config.temperature}. Model top_p: {model.config.top_p}')

Model device: cuda:0
Model temperature: 1.0. Model top_p: 1.0


In [14]:
# This code is also used by Artigenz, Deepseek Coder, Nxcode-CQ-7B-orpo (works for Llama 3.1 based on trial and error experiments with other similar funcs)
def generate_response(prompt):
  messages = [
      {"role": "user", "content": prompt}
  ]
  inputs = tokenizer.apply_chat_template(messages, add_generation_prompt=True, return_tensors="pt").to(model.device)
  outputs = model.generate( inputs, max_new_tokens=512, do_sample=True,
                            temperature=0.7, top_k=50, top_p=1.0,
                            num_return_sequences=1,
                            pad_token_id=tokenizer.pad_token_id,
                            eos_token_id=tokenizer.eos_token_id, )
  res = tokenizer.decode(outputs[0][len(inputs[0]):], skip_special_tokens=True)
  return res

In [15]:
input_prompt = "What is the capital of California?"
generate_response(input_prompt)

The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


'The capital of California is Sacramento.'

In [20]:
task = tasks['BigCodeBench/15']

completion_prompt = '''1. Complete the following starter code consisting of import statements, function signature and docstring.
2. Your output must include only the function body with proper Python code and indentation.
3. Do not include phrases like “Completion:” or any other headings, do not show example usage, do not show test cases, do not provide explanatory text.
4. Your output must be with no additional text, no extra symbols, no code fences (```), and no other content before or after the function body.
5. Stop generating immediately after the return statement or final line of the function body.

Starter Code:\n\n'''

reflection_prompt = '''1. Act as a very experienced Python developer.
2. Read the requirements listed below, the starter code, and the proposed solution.
3. Correct all errors in the logic, syntax, or any other aspects of the proposed solution. Remove redundant code. Make sure the proposed solution solves the problem in the best possible way.
4. Output the corrected, improved solution. If no changes are required, output the old proposed solution.
5. Do not output anything else before or after the improved solution. Your entire output should be runnable Python code.

Requirements:
{}
{}

Proposed Solution:
{}'''


proposed_solution = 'Python code for proposed solution'

print(reflection_prompt.format(completion_prompt, task['prompt'], proposed_solution))

1. Act as a very experienced Python developer.
2. Read the requirements listed below, the starter code, and the proposed solution.
3. Correct all errors in the logic, syntax, or any other aspects of the proposed solution. Remove redundant code. Make sure the proposed solution solves the problem in the best possible way.
4. Output the corrected, improved solution. If no changes are required, output the old proposed solution.
5. Do not output anything else before or after the improved solution. Your entire output should be runnable Python code.

Requirements:
1. Complete the following starter code consisting of import statements, function signature and docstring.
2. Your output must include only the function body with proper Python code and indentation.
3. Do not include phrases like “Completion:” or any other headings, do not show example usage, do not show test cases, do not provide explanatory text.
4. Your output must be with no additional text, no extra symbols, no code fences (```)

In [17]:
# to save results
model_nickname = 'llama31_8'
time_stamp     = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-2]
file_name      = f'/content/drive/My Drive/Colab Notebooks/logs/{model_nickname}_samples_{time_stamp}_MBPP.jsonl'
file_name

'/content/drive/My Drive/Colab Notebooks/logs/llama31_8_samples_20250125_112538_7847_MBPP.jsonl'

In [21]:
num_samples_per_task = 1
completions = []
is_full_func = []

for task_id, task_body in tasks.items():
    print(task_id)
    #if task_id == 'BigCodeBench/7':
    #        break
    for i in range(num_samples_per_task):

        # generate completion
        start_time  = time.time()
        full_prompt = completion_prompt + task_body['prompt']
        try:
            proposed_solution  = generate_response( full_prompt )
        except Exception as e:
            proposed_solution = f"Error generating a completion:\n{e}"
        proposed_solution = remove_triple_backticks_at_edges(proposed_solution.strip())
        is_full = is_full_function(proposed_solution)
        if is_full:
            proposed_solution = get_import_statements(task_body['prompt']) + '\n' + proposed_solution

        # improve solution
        new_full_prompt = reflection_prompt.format(completion_prompt, task_body['prompt'], proposed_solution)
        try:
            completion = generate_response( new_full_prompt )
        except Exception as e:
            completion = f"Error generating a completion:\n{e}"
        #completion = remove_triple_backticks_at_edges(completion.strip())
        #is_full = is_full_function(completion)
        #is_full_func.append(is_full)
        #if is_full:
        #    completion = get_import_statements(task_body['prompt']) + '\n' + completion

        completions.append( {'task_id': task_id, 'completion': completion} )
        write_jsonl(file_name, completions)
        print('New Full Prompt:\n', new_full_prompt, '\n', '-'*75, '\n', sep='')
        print('Improved Completion:\n', completion, sep='')
        print(f"\nTime elapsed: {(time.time() - start_time):.4f} seconds\n", '\n', '='*75, '\n', sep='')

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
import pandas as pd

# Constants
PLANETS = [
    "Mercury",
    "Venus",
    "Earth",
    "Mars",
    "Jupiter",
    "Saturn",
    "Uranus",
    "Neptune",
]
ELEMENTS = [
    "Hydrogen",
    "Helium",
    "Oxygen",
    "Carbon",
    "Nitrogen",
    "Magnesium",
    "Silicon",
    "Iron",
    "Nickel",
]


def task_func():
    """
    Generate a DataFrame where each row contains random planet-element pairs.
    Each pair is formatted as 'Planet:Element'. The number of rows is determined by
    the number of planets, and each row will contain as many planet-element pairs as there are elements.

    Parameters:
    - None

    Returns:
    pandas.DataFrame: A DataFrame where each cell contains a string in the format 'Planet:Element'.
                      The DataFrame has a number of rows equal to the number of planets and
                      a number of columns equal to the number of elements.

    Requirements:
    - nu

In [22]:
# count how many full funcs were generated vs. only func bodies without func signatures
from collections import Counter
Counter(is_full_func)

Counter()

In [None]:
from google.colab import runtime
runtime.unassign()