This notebook contains testing of the Deepseek 7B Instruct quantized model on the MBPP dataset using a system prompt and simulating the dialogue between the model and a user (few-shot). Examples (automatic tests) are provided with each task from the dataset.

# Installations and imports

In [None]:
!pip install accelerate
!pip install bitsandbytes



In [None]:
from tqdm import tqdm

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

In [None]:
!pip install datasets
from datasets import load_dataset



In [None]:
import re
import pandas as pd

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
import multiprocessing
import time

# Choosing dataset

In [None]:
dataset = load_dataset("mbpp")
dataset

Downloading data:   0%|          | 0.00/87.2k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/116k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/25.1k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/7.88k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/374 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/500 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/90 [00:00<?, ? examples/s]

Generating prompt split:   0%|          | 0/10 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['task_id', 'text', 'code', 'test_list', 'test_setup_code', 'challenge_test_list'],
        num_rows: 374
    })
    test: Dataset({
        features: ['task_id', 'text', 'code', 'test_list', 'test_setup_code', 'challenge_test_list'],
        num_rows: 500
    })
    validation: Dataset({
        features: ['task_id', 'text', 'code', 'test_list', 'test_setup_code', 'challenge_test_list'],
        num_rows: 90
    })
    prompt: Dataset({
        features: ['task_id', 'text', 'code', 'test_list', 'test_setup_code', 'challenge_test_list'],
        num_rows: 10
    })
})

In [None]:
dataset_prompt = dataset['prompt']

In [None]:
dataset = dataset['test']

# Choosing model

## DeepSeek 6.7B Instruct

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_name = "deepseek-ai/deepseek-coder-6.7b-instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_name, trust_remote_code=True, torch_dtype=torch.bfloat16, device_map='auto')

tokenizer_config.json:   0%|          | 0.00/1.87k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.37M [00:00<?, ?B/s]

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


config.json:   0%|          | 0.00/760 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/25.1k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/9.98G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/3.50G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/119 [00:00<?, ?B/s]

## Codellama 7B Instruct

In [None]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = "codellama/CodeLlama-7b-Instruct-hf"
# #model = "deepseek-ai/deepseek-coder-1.3b-instruct"
# tokenizer = AutoTokenizer.from_pretrained(model)
# model = AutoModelForCausalLM.from_pretrained(model, torch_dtype=torch.float16, device_map='auto', load_in_8bit=True)

# Generation and data preparation functions

In [None]:
def read_test_examples(dataset):
    def format_test_example(q, tests, code: str=None):
        prompt = ">>> Problem:\n{}\n>>> Test Cases:\n{}\n".format(q.strip(), "\n".join(tests))
        if code:
            code = code.replace("\r", "").replace("\t", "    ")
            prompt += "\n>>> Code:\n```python\n{}\n```".format(code)
        return prompt

    examples = dataset['prompt']

    # test_cases
    examples_str = []
    for i in range(1, 4):
        ex = examples[i]
        q, test, code = ex['text'], ex['test_list'], ex['code']
        ex_prompt = format_test_example(q, test, code)
        example_prompt = '- Example {}:\n{}'.format(i, ex_prompt)
        examples_str += [example_prompt]

    examples = dataset['test']
    for i in range(500):
        ex = examples[i]
        q, test, code = ex['text'], ex['test_list'], ex['code']

        prompt = format_test_example(q, test, code=None)

        prompt_with_shots = '''
Please refer the given examples and generate a python function for my problem.
Examples are listed as follows:
{}

Here is my problem:
{}
'''.strip().format('\n\n'.join(examples_str), prompt)
        yield {
            'task_id': ex['task_id'],
            'prompt': prompt_with_shots
        }

In [None]:
def generate(model, tasks, shots=0, num_tests=len(dataset), do_sample=False, top_p=-1.0, top_k=0, temperature=1.0, dialog=[]):
    model.eval()
    prompt_list = []
    responses = []

    for prompt in tasks[:num_tests]:
        messages = [
            {
                'role': 'user',
                'content': prompt['prompt']
            }
        ]
        prompt_list.append(prompt['prompt'])
        inputs = tokenizer.apply_chat_template(messages, add_generation_prompt=True, return_tensors="pt").to(device)
        #print(tokenizer.decode(inputs[0], skip_special_tokens=False))

        with torch.no_grad():
            outputs = model.generate(input_ids=inputs, max_new_tokens=200, num_return_sequences=1, do_sample=do_sample, top_p=top_p, top_k=top_k, temperature=temperature, eos_token_id=tokenizer.eos_token_id)
            response = tokenizer.decode(outputs[0][len(inputs[0]):], skip_special_tokens=True)

        responses.append(response)

    return prompt_list, responses

# Testing

In [None]:
def extract_code(text):
    code = []
    inside_function = False

    for line in text.split('\n'):
        if line.startswith('```python'):
            inside_function = True
        elif (line.startswith('import') or line.startswith('from') or line.startswith('def')) and inside_function == False:
            inside_function = True
            code.append(line)
        elif inside_function:
            if line == '```' or line.startswith('assert') or line.startswith('# Test'):
                return '\n'.join(code)
            else:
                code.append(line)
    return '\n'.join(code)

In [None]:
def exec_code(code, result_queue):
    try:
        exec(code)
        result_queue.put(True)
    except Exception as e:
        result_queue.put(False)

def run(code):
    result_queue = multiprocessing.Queue()
    p = multiprocessing.Process(target=exec_code, args=(code, result_queue))
    p.start()
    p.join(5)

    if p.is_alive():
        print("Execution time has been exceeded. Process killed.")
        p.terminate()
        p.join()
        return False

    return result_queue.get()

def test(num_tests, dataset, responses, df):
    codes = []
    tests = []
    results = []
    for i in range(num_tests):
        code = extract_code(responses[i])
        df.loc[i, 'code'] = code
        codes.append(code)
        test = dataset['test_list'][i][0] + '\n' + dataset['test_list'][i][1] + '\n' + dataset['test_list'][i][2]
        tests.append(test)
        code = code + '\n' + test if code else ''
        code = code.strip()
        df.loc[i, 'tests'] = test

        flag = run(code)
        result = 'Ok' if flag else 'Error'
        results.append(result)
        df.loc[i, 'result'] = result

    return codes, tests, results

In [None]:
def predict(df, model, dataset, tasks, shots=0, num_tests=500, do_sample=False, top_p=-1.0, top_k=0, temperature=1.0, dialog=[]):
    prompt_list, responses = generate(model=model, tasks=tasks, shots=shots, num_tests=num_tests, do_sample=do_sample, top_p=top_p, top_k=top_k, temperature=temperature, dialog=dialog)
    df['prompt'] = pd.Series(prompt_list)
    df['response'] = pd.Series(responses)
    codes, tests, results = test(num_tests=num_tests, dataset=dataset, responses=responses, df=df)

# Getting results

In [None]:
NUM_TESTS = len(dataset['test'])
NUM_TESTS

500

In [None]:
examples = list(read_test_examples(dataset))

In [None]:
df2 = pd.DataFrame(columns = ['prompt', 'response', 'code', 'tests', 'result'])
predict(df2, model=model, dataset=dataset['test'], tasks=examples, num_tests=NUM_TESTS, do_sample=False)

In [None]:
df2

# Saving results

In [None]:
df2.to_csv('deepseek_mbpp_few_shot_greedy.csv', index=False)