<a href="https://colab.research.google.com/github/claudiarichardxx/Finetuning-CodeLlama/blob/main/Finetuning_Codellama.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction

**Code Llama** is a family of large language models for code based on Llama 2 providing state-of-the-art performance among open models, infilling capabilities, support for large input contexts, and zero-shot instruction following ability for programming tasks. We provide multiple flavors to cover a wide range of applications: foundation models (Code Llama), Python specializations (Code Llama - Python), and instruction-following models (Code Llama - Instruct) with 7B, 13B and 34B parameters each. All models are trained on sequences of 16k tokens and show improvements on inputs with up to 100k tokens. 7B and 13B Code Llama and Code Llama - Instruct variants support infilling based on surrounding content. (Taken from the abstract of the paper : [Code Llama: Open Foundation Models for Code](https://ai.meta.com/research/publications/code-llama-open-foundation-models-for-code/))

This colab demos how to

* Load Code Llama
* Evaluate Code Llama
* FineTune Code Llama

Cloning the GitHub repository 'claudiarichardxx/Finetuning-CodeLlama' to download the data

In [1]:
!git clone https://github.com/claudiarichardxx/Finetuning-CodeLlama.git

Cloning into 'Finetuning-CodeLlama'...
remote: Enumerating objects: 213, done.[K
remote: Counting objects: 100% (213/213), done.[K
remote: Compressing objects: 100% (157/157), done.[K
remote: Total 213 (delta 116), reused 131 (delta 50), pack-reused 0 (from 0)[K
Receiving objects: 100% (213/213), 5.14 MiB | 14.11 MiB/s, done.
Resolving deltas: 100% (116/116), done.


In [2]:
cd Finetuning-CodeLlama/

/workspace/Finetuning-CodeLlama


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


# Finetuning Setup

## Installations

We first install the libraries:

* `accelerate`: The accelerate library is designed to make it easier to run models on GPUs and distributed environments, helping with performance optimization and scaling.

* `peft`: The peft library stands for "parameter-efficient fine-tuning," which is useful for adapting pre-trained models with fewer parameters, making the fine-tuning process more efficient.

* `bitsandbytes`: The bitsandbytes library provides 4 and 8-bit optimizers and matrix multiplication routines, allowing for more memory-efficient training of LLMs.

* `transformers`: The transformers library by Hugging Face provides state-of-the-art implementations of transformer models (like BERT, GPT, T5, etc.), allowing easy model loading, training, and deployment.

* `trl`: The trl library (Transformers Reinforcement Learning) integrates reinforcement learning algorithms with transformer models, allowing for fine-tuning of language models using RL techniques.

* `tqdm` and `fire` are required to run the evaluation on HumanEval as mentioned in the GitHub repository.


In [3]:
! pip install -q accelerate==0.27.2 peft==0.5.0 bitsandbytes==0.41.3 transformers==4.38.2 trl==0.4.7
! pip install -q protobuf sentencepiece scipy
! pip install tqdm fire

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Collecting fire
  Downloading fire-0.7.0.tar.gz (87 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.2/87.2 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting termcolor (from fire)
  Downloading termcolor-2.5.0-py3-none-any.whl.metadata (6.1 kB)
Downloading termcolor-2.5.0-py3-none-any.whl (7.8 kB)
Building wheels for collected packages: fire
  Building wheel for fire (setup.p

## Some helper functions

`json`: Used to parse JSON data into Python objects and write Python objects back to JSON format.

`os`: Provides functions to interact with the operating system, like handling file paths and checking if files exist.

`io`: Provides tools to work with file-like objects (both in-memory and actual files). It helps handle file streams more flexibly.

In [4]:
import json
import os
import io

This function loads a .json file and converts it into a Python dictionary. It takes the file path f and a file access mode (default is "r" for read mode)

In [5]:
def jload(f, mode="r"):

    """Load a .json file into a dictionary."""

    f = _make_r_io_base(f, mode)
    jdict = json.load(f)
    f.close()

    return jdict

This helper function ensures that the input f is a file object. If f is not already a file object (i.e., if it’s a string representing a file path), the function opens the file with the given mode.

In [6]:
def _make_r_io_base(f, mode: str):

    if not isinstance(f, io.IOBase):
        f = open(f, mode=mode)

    return f

# Instruction Tuning

Reference: [CodeAlpaca](https://github.com/sahil280114/codealpaca.git)

## Step 1: Load the model and tokenizer

`CodeLlamaTokenizer, LlamaForCausalLM, BitsAndBytesConfig`: Necessary classes from Hugging Face’s transformers library. This helps in loading the tokenizer and model (Llama) and configuring model quantization with BitsAndBytes for efficient loading.

`torch`: PyTorch, the core library used for tensor operations and model handling.

In [7]:
from transformers import CodeLlamaTokenizer, LlamaForCausalLM, BitsAndBytesConfig
import torch

This line sets the name of the model to be loaded. It will be used later to initialize both the model and tokenizer from Hugging Face's model hub.

In [8]:
model_name = 'codellama/CodeLlama-7b-Python-hf'

In this step, we set up the configuration for loading our model efficiently:

`use_4bit = True`: We're opting to use 4-bit quantization to reduce memory usage and speed up inference. It's a trade-off between precision and resource efficiency.

`bnb_4bit_compute_dtype = "float16"`: We specify that calculations will be done in 16-bit floating point format, which helps in saving GPU memory while still providing decent accuracy.

`bnb_4bit_quant_type = "nf4"`: This sets the quantization type to "nf4", which is a specific way to represent our model weights that helps further optimize performance.

`use_nested_quant = False`: We're not using nested quantization here.

`compute_dtype = getattr(torch, bnb_4bit_compute_dtype)`: This line retrieves the float16 type from the PyTorch library so we can use it in our configuration.

`bnb_config = BitsAndBytesConfig(...)`: Finally, we create a configuration object for the BitsAndBytes setup, passing in all the parameters we've defined. This will guide how our model is loaded and optimized.

In [9]:
use_4bit = True
bnb_4bit_compute_dtype = "float16"
bnb_4bit_quant_type = "nf4"
use_nested_quant = False
compute_dtype = getattr(torch, bnb_4bit_compute_dtype)

bnb_config = BitsAndBytesConfig(
                load_in_4bit = use_4bit,
                bnb_4bit_quant_type = bnb_4bit_quant_type,
                bnb_4bit_compute_dtype = compute_dtype,
                bnb_4bit_use_double_quant=use_nested_quant,
            )

In this part, we're loading the model and tokenizer based on our earlier setup:

**Loading the Model:**

- If `use_4bit` is `True`, we load the model with 4-bit quantization by passing the `bnb_config`.
- If `use_4bit` is `False`, we simply load the model without quantization, still ensuring it runs efficiently on the available hardware by setting `device_map='auto'`.

**Loading the Tokenizer:**

- We then load the tokenizer using `CodeLlamaTokenizer.from_pretrained(model_name)`. The `padding_side="right"` option specifies how to handle padding when processing sequences, while `use_fast=False` means we're opting for a standard implementation instead of a faster version that requires additional dependencies.

In [10]:
if (use_4bit):
            model = LlamaForCausalLM.from_pretrained(
                    model_name,
                    quantization_config=bnb_config,
                    device_map='auto')

else:
            model = LlamaForCausalLM.from_pretrained(
                    model_name,
                    device_map='auto')


tokenizer = CodeLlamaTokenizer.from_pretrained(
                model_name,
                padding_side = "right",
                use_fast=False,)



config.json:   0%|          | 0.00/644 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/25.1k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/9.98G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/3.50G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]



generation_config.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/749 [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/411 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.84M [00:00<?, ?B/s]

In this section, we define some default tokens used in our model:

- `DEFAULT_PAD_TOKEN`: This is set to `"[PAD]"`, which is used to pad sequences to ensure they have the same length during training or inference.
  
- `DEFAULT_EOS_TOKEN`: The end-of-sequence token is set to `"</s>"`. This token indicates the end of a sequence, helping the model understand when to stop generating text.

- `DEFAULT_BOS_TOKEN`: The beginning-of-sequence token is also set to `"</s>"`. This token is used to signal the start of a sequence, allowing the model to recognize when it should begin processing input.

- `DEFAULT_UNK_TOKEN`: The unknown token is set to `"</s>"`. This token represents any word that is not in the model's vocabulary, helping the model handle unseen words during text processing.

These tokens are essential for managing input and output sequences effectively in natural language processing tasks.

In [11]:
DEFAULT_PAD_TOKEN = "[PAD]"
DEFAULT_EOS_TOKEN = "</s>"
DEFAULT_BOS_TOKEN = "</s>"
DEFAULT_UNK_TOKEN = "</s>"

This function helps integrate new special tokens into a tokenizer and adjusts the model's embeddings accordingly.

The function starts by adding the new special tokens to the tokenizer. It keeps track of how many new tokens are added.
Then, it resizes the model's token embeddings to match the total number of tokens in the updated tokenizer.
If any new tokens were added, it **averages the existing input and output embeddings** (excluding the new tokens) to initialize the embeddings for the new tokens. This helps maintain consistency in how the model handles the input and output data.

In [12]:
def smart_tokenizer_and_embedding_resize(
            special_tokens_dict,
            tokenizer,
            model,
        ):
            """
            Resize the tokenizer and adjust model embeddings to include new special tokens.

            Input arguments:
                special_tokens_dict (dict):
                    A dictionary containing the special tokens (e.g., `pad_token`, `bos_token`, `eos_token`) to add to the tokenizer.

                tokenizer (PreTrainedTokenizer):
                    The tokenizer whose vocabulary will be resized to include new special tokens.

                model (PreTrainedModel):
                    The model whose input and output embeddings will be resized to match the new tokenizer size.

            Returns:
                None:
                    This function modifies the `tokenizer` and `model` in-place and does not return any value.

            Note: This is the unoptimized version that may make your embedding size not be divisible by 64.
            """

            num_new_tokens = tokenizer.add_special_tokens(special_tokens_dict)
            model.resize_token_embeddings(len(tokenizer))

            if num_new_tokens > 0:

                input_embeddings = model.get_input_embeddings().weight.data
                output_embeddings = model.get_output_embeddings().weight.data

                input_embeddings_avg = input_embeddings[:-num_new_tokens].mean(dim=0, keepdim=True)
                output_embeddings_avg = output_embeddings[:-num_new_tokens].mean(dim=0, keepdim=True)

                input_embeddings[-num_new_tokens:] = input_embeddings_avg
                output_embeddings[-num_new_tokens:] = output_embeddings_avg

This code checks if the tokenizer's padding token is set; if not, it calls the `smart_tokenizer_and_embedding_resize` function to add a default padding token (`DEFAULT_PAD_TOKEN`) and resize the model's embeddings accordingly. After ensuring the padding token is in place, it adds additional special tokens, such as the end-of-sequence, beginning-of-sequence, and unknown tokens, using `tokenizer.add_special_tokens(...)`. This process ensures the tokenizer is fully equipped with the necessary special tokens, allowing the model to handle various tasks effectively.

In [13]:
if tokenizer.pad_token is None:
                    smart_tokenizer_and_embedding_resize(
                    special_tokens_dict = dict(pad_token=DEFAULT_PAD_TOKEN),
                    tokenizer = tokenizer,
                    model = model
                )

tokenizer.add_special_tokens(
            {
                "eos_token": DEFAULT_EOS_TOKEN,
                "bos_token": DEFAULT_BOS_TOKEN,
                "unk_token": DEFAULT_UNK_TOKEN,
            })

0

## Step 2: Define the prompt and process the training dataset

Here is a short and simple explanation of each import:

- `import torch`: Imports the PyTorch library, which is used for tensor operations and building machine learning models.
  
- `from torch.utils.data import Dataset`: Imports `Dataset`, a class in PyTorch for creating custom datasets to be used in data loading and training.

- `from dataclasses import dataclass, field`: Imports `dataclass` and `field` from the `dataclasses` module, which are used to create simple classes for storing data with minimal boilerplate code.

- `import copy`: Imports the `copy` module, which provides functions to create shallow or deep copies of objects, useful when you need to duplicate an object without affecting the original.

In [14]:
import torch
from torch.utils.data import Dataset
from dataclasses import dataclass, field
import copy

In this setup, we determine which prompt template to use based on the presence of an input field in the training set. If the training data includes an input field, the `prompt_input` template is utilized, which combines the instruction with the input context to generate a more comprehensive response. If the training set lacks an input field, the simpler `prompt_no_input` template is applied, focusing solely on the instruction. This approach ensures that the model receives appropriately structured prompts tailored to the available data, facilitating more effective training and response generation.

The IGNORE_INDEX is set to -100, which is used to specify tokens that should be ignored during loss calculations in model training.

In [15]:
IGNORE_INDEX = -100

PROMPT_DICT = {
    "prompt_input": (
        "Below is an instruction that describes a task, paired with an input that provides further context. "
        "Write a response that appropriately completes the request.\n\n"
        "### Instruction:\n{instruction}\n\n### Input:\n{input}\n\n### Response:"
    ),

    "prompt_no_input": (
        "Below is an instruction that describes a task. "
        "Write a response that appropriately completes the request.\n\n"
        "### Instruction:\n{instruction}\n\n### Response:"
    ),
}

The `DataCollatorForSupervisedDataset` class is a data collator for preparing batches of data during supervised fine-tuning. It's defined using `@dataclass` and takes a tokenizer as an input. The `__call__` method processes a batch of instances, where each instance is a dictionary containing `input_ids` and `labels`. It extracts these values from the instances and pads them to the same length, using the tokenizer's padding token for `input_ids` and `IGNORE_INDEX` (set to `-100`) for the labels. Additionally, an attention mask is created to distinguish between real tokens (marked with 1) and padding tokens (marked with 0). The method returns a dictionary containing the padded `input_ids`, `labels`, and the `attention_mask`.

In [16]:
@dataclass
class DataCollatorForSupervisedDataset(object):

    """Collate examples for supervised fine-tuning."""
    tokenizer: object

    def __call__(self, instances):

            """
            Processes a batch of instances for supervised fine-tuning.

            Input arguments:
                instances (list of dict):
                    A list of dictionaries where each dictionary contains the following keys:
                    - 'input_ids': Tensor of input IDs.
                    - 'labels': Tensor of corresponding labels.

            What it does:
                    - Extracts input IDs and labels from the provided instances.
                    - Pads the input IDs and labels to the same length using the tokenizer's padding token for input IDs and a predefined constant for labels.
                    - Constructs  an attention mask to identify real tokens (not padding) in the input IDs.

            Returns:
                dict:
                    A dictionary containing:
                    - 'input_ids': A tensor of padded input IDs.
                    - 'labels': A tensor of padded labels.
                    - 'attention_mask': A tensor indicating the presence of real tokens (1) and padding (0).
            """

            input_ids, labels = tuple([instance[key] for instance in instances] for key in ("input_ids", "labels"))
            input_ids = torch.nn.utils.rnn.pad_sequence(
                input_ids, batch_first=True, padding_value = tokenizer.pad_token_id
            )
            labels = torch.nn.utils.rnn.pad_sequence(labels, batch_first = True, padding_value = IGNORE_INDEX)
            return dict(
                input_ids = input_ids,
                labels = labels,
                attention_mask = input_ids.ne(tokenizer.pad_token_id),
            )

## Step 3: Change the source and target according to the mode

The `SupervisedDataset` class is a custom dataset class for supervised fine-tuning. It loads data from a JSON file, formats input-output pairs using the predefined `PROMPT_DICT`, and tokenizes them with a provided tokenizer.

The `preprocess` method tokenizes both the sources and targets, adjusting the labels based on the mode ('IT' for Instruction Tuning or 'IM' for Instruction Modelling). In 'IT' mode, parts of the labels corresponding to the input text are replaced with `IGNORE_INDEX`, which is set to `-100`, so they are ignored during loss calculation. Tokenized data is stored as `input_ids` and `labels`.

The `__len__` method returns the number of examples, and `__getitem__` retrieves the input IDs and labels for a given index.

In [17]:
class SupervisedDataset(Dataset):

        """Dataset for supervised fine-tuning."""

        def preprocess(
            self, sources, targets, tokenizer, mode = 'IT'
        ):

            """
            Preprocess the data by tokenizing.

            Input arguments:
                sources (list of str):
                    The source texts to be tokenized.
                targets (list of str):
                    The target texts corresponding to the sources.
                tokenizer (Tokenizer):
                    The tokenizer used for converting text to input IDs.
                mode (str, optional):
                    The mode of operation for the preprocessing (default is 'IT' for Instruction Tuning). Options are IT or IM (Instruction Modelling).

            What it does:
                - Combines the source and target texts into a single list of examples.
                - Tokenizes the combined examples and the source texts using the provided tokenizer.
                - Creates input IDs from the tokenized examples and initializes labels based on the input IDs.
                - In 'IT' mode, replaces the parts of labels that correspond to source lengths with the `IGNORE_INDEX` token.
                - Returns a dictionary containing the input IDs and labels.

            Returns:
                dict: A dictionary containing:
                    - input_ids: The tokenized input IDs.
                    - labels: The tokenized labels (targets), modified for 'IT' or 'IM' mode.
            """

            examples = [s + t for s, t in zip(sources, targets)]

            examples_tokenized, sources_tokenized = [_tokenize_fn(strings, tokenizer) for strings in (examples, sources)]
            input_ids = examples_tokenized["input_ids"]
            labels = copy.deepcopy(input_ids)

            if(mode == 'IT'):

                for label, source_len in zip(labels, sources_tokenized["input_ids_lens"]):
                    label[:source_len] = IGNORE_INDEX

                return dict(input_ids=input_ids, labels=labels)

            else:

                return dict(input_ids=input_ids, labels=labels)


        def __init__(self, data_path, tokenizer, mode = 'IT'):

            """
            Initializes the SupervisedDataset instance by loading and processing the data.

            Input arguments:
                data_path (str):
                    The path to the JSON file containing the dataset.
                tokenizer (Tokenizer):
                    The tokenizer used for converting text to input IDs.
                mode (str, optional):
                    The mode of operation for the dataset (default is 'IT' for Instruction Tuning). Options are IT or IM (Instruction Modelling).

            What it does:
                - Loads the data from the specified JSON file.
                - Formats the input prompts based on the contents of the data.
                - Constructs source texts by applying appropriate formatting from PROMPT_DICT.
                - Constructs target texts by appending the tokenizer's end-of-sequence token to the outputs.
                - Calls the preprocess method to tokenize the sources and targets.
                - Initializes input_ids and labels attributes with the processed data.

            Returns:
                None
            """

            super(SupervisedDataset, self).__init__()
            print("Loading data...")
            list_data_dict = jload(data_path)

            print("Formatting inputs...")
            prompt_input, prompt_no_input = PROMPT_DICT["prompt_input"], PROMPT_DICT["prompt_no_input"]
            sources = [
                prompt_input.format_map(example) if example.get("input", "") != "" else prompt_no_input.format_map(example)
                for example in list_data_dict
            ]
            targets = [f"{example['output']}{tokenizer.eos_token}" for example in list_data_dict]

            print("Tokenizing inputs... This may take some time...")
            data_dict = self.preprocess(sources, targets, tokenizer, mode = mode)

            self.input_ids = data_dict["input_ids"]
            self.labels = data_dict["labels"]

        def __len__(self):

            """
            Returns the number of examples in the dataset.

            Input arguments:
                None

            What it does:
                - Calculates the length of the dataset by return`ing the number of input IDs.

            Returns:
                int:
                    The number of examples in the dataset, which is equal to the length of the input_ids attribute.
            """

            return len(self.input_ids)

        def __getitem__(self, i):

            """
            Retrieves a single example from the dataset.

            Input arguments:
                i (int):
                    The index of the example to retrieve.

            What it does:
                - Returns a dictionary containing the input IDs and labels for the specified index.

            Returns:
                dict:
                    A dictionary with the following keys:
                    - 'input_ids': The input IDs corresponding to the specified index.
                    - 'labels': The labels corresponding to the specified index.
            """

            return dict(input_ids=self.input_ids[i], labels=self.labels[i])

The `make_supervised_data_module` function is designed to create a dataset and data collator specifically for supervised fine-tuning of a model. It takes three input arguments: `data_path`, which specifies where the training data is located; `tokenizer`, which is responsible for converting the raw input into tokens suitable for the model; and an optional `mode`, defaulting to 'IT' for Instruction Tuning.

The function performs the following steps: it initializes a `SupervisedDataset` object with the provided parameters, which prepares the training data for use. It then creates a `DataCollatorForSupervisedDataset` using the same tokenizer, which helps batch the data appropriately during training. Finally, the function returns a dictionary containing the initialized training dataset, an evaluation dataset (set to `None`), and the data collator, allowing for seamless integration into the training pipeline. This setup ensures that the model is trained effectively on the provided data while accommodating specific requirements based on the dataset's structure.


In [18]:
def make_supervised_data_module( data_path, tokenizer, mode = 'IT'):

        """
        Create a dataset and data collator for supervised fine-tuning.

        Input arguments:
            data_path (str):
                The path to the dataset file or directory containing the training data.
            tokenizer (Tokenizer):
                The tokenizer to be used for tokenizing the input data.
            mode (str, optional):
                The mode of operation for the dataset, default is 'IT' (Instruction Tuning).

        What it does:
                - Initializes a `SupervisedDataset` using the provided `data_path`, `tokenizer`, and `mode`.
                - Creates a `DataCollatorForSupervisedDataset` using the provided `tokenizer`.
                - Returns a dictionary containing the training dataset, evaluation dataset (set to None), and the data collator.

        Returns:
            dict: A dictionary with the following keys:
                - 'train_dataset': The initialized training dataset.
                - 'eval_dataset': The evaluation dataset (currently set to None).
                - 'data_collator': The data collator for batching the dataset.
        """

        train_dataset = SupervisedDataset(data_path, tokenizer = tokenizer, mode = mode)
        data_collator = DataCollatorForSupervisedDataset(tokenizer = tokenizer)
        return dict(train_dataset = train_dataset, eval_dataset = None, data_collator = data_collator)

The `_tokenize_fn` function tokenizes a list of strings using a specified tokenizer. It takes two inputs: `strings`, which is a list of strings to tokenize, and `tokenizer`, a pre-trained tokenizer instance. The function tokenizes each string with padding set to the longest sequence and enables truncation to fit within the tokenizer's maximum length. It extracts the `input_ids`, which represent the tokenized strings, and sets the `labels` to the same values. Additionally, it calculates the lengths of the input and label sequences, excluding any padding. Finally, it returns a dictionary containing `input_ids`, `labels`, `input_ids_lens`, and `labels_lens`, effectively preparing the data for model training.

In [19]:
def _tokenize_fn(strings, tokenizer):

            """
            Tokenize a list of strings using the provided tokenizer.

            Input arguments:
                strings (list of str):
                    A list of input strings to be tokenized.

                tokenizer (PreTrainedTokenizer):
                    A tokenizer instance (e.g., from Hugging Face) that will tokenize the input strings.
                    This tokenizer should return PyTorch tensors (`return_tensors="pt"`).

            What it does:
                    - Tokenizes each string in the input `strings` list.
                    - Applies padding to ensure all tokenized outputs are of the same length.
                    - Truncates tokenized strings to fit within the tokenizer's maximum length.
                    - Extracts input IDs and labels from the tokenized results.
                    - Computes the length of each tokenized sequence (i.e., the number of non-padding tokens).

            Returns:
                dict: A dictionary with the following keys:
                    - `input_ids` (list of torch.Tensor): The input token IDs for each string.
                    - `labels` (list of torch.Tensor): The same as `input_ids`, used as labels.
                    - `input_ids_lens` (list of int): Lengths of the input token sequences (excluding padding).
                    - `labels_lens` (list of int): Lengths of the label token sequences (excluding padding).
            """

            tokenized_list = [
                tokenizer(
                    text,
                    return_tensors="pt",
                    padding="longest",
                    #max_length=tokenizer.model_max_length,
                    truncation=True,
                )
                for text in strings
            ]
            input_ids = labels = [tokenized.input_ids[0] for tokenized in tokenized_list]
            input_ids_lens = labels_lens = [
                tokenized.input_ids.ne(tokenizer.pad_token_id).sum().item() for tokenized in tokenized_list
            ]
            return dict(
                input_ids=input_ids,
                labels=labels,
                input_ids_lens=input_ids_lens,
                labels_lens=labels_lens,
            )

In this snippet, we set `data_path` to the file path `'data/code_alpaca_2k.json'`, which points to the dataset we want to use. Then, we create a data module by calling the `make_supervised_data_module` function, passing in the `data_path`, the previously defined `tokenizer`, and specifying the mode as `'IT'` for Instruction Tuning. This effectively initializes the training dataset and prepares the data for the supervised fine-tuning process, ensuring that the data is properly tokenized and ready for model training.

In [20]:
data_path ='data/leetcode_instructions_code_alpaca_format.json'
data_module = make_supervised_data_module(data_path, tokenizer=tokenizer, mode = 'IT')

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


Loading data...
Formatting inputs...
Tokenizing inputs... This may take some time...


## Step 4: PEFT Configuration

In this step, we’re importing `LoraConfig` and `PeftModel` from the PEFT library. This library provides tools for parameter-efficient fine-tuning (PEFT), allowing us to efficiently adapt models to new tasks with minimal additional parameters. `LoraConfig` helps us configure the Low-Rank Adaptation (LoRA) method, which reduces the number of trainable parameters while maintaining model performance. Meanwhile, `PeftModel` is used to create a model that incorporates these adaptations, making it easier to fine-tune large language models effectively.

In [21]:
from peft import LoraConfig, PeftModel

In this section, we define the configuration parameters for the LoRA setup: `lora_r`, `lora_alpha`, and `lora_dropout`.

- **`lora_r`** is set to `2`, which specifies the rank for the low-rank adaptation. A lower rank typically means fewer parameters, helping to reduce computational costs while still capturing the essential information needed for the task. It is set to 2 here to demonstrate finetuning in colab in a shorter period of time.
- **`lora_alpha`** is set to `16`, which is a scaling factor used to control the strength of the LoRA updates. A higher alpha can lead to stronger adaptations but may require careful tuning to avoid overfitting.
- **`lora_dropout`** is set to `0.1`, indicating a dropout rate of 10%. This dropout is applied to the LoRA layers during training to help prevent overfitting by randomly dropping a portion of the parameters during updates.

In [25]:
lora_r = 60  # lora rank
lora_alpha = 16  # alpha parameter for LoRA scaling
lora_dropout = 0.1   # dropout rate for LoRA

This code snippet sets up the PEFT (Parameter-Efficient Fine-Tuning) configuration for supervised fine-tuning of a causal language model using LoRA (Low-Rank Adaptation). The `LoraConfig` is initialized with previously defined variables: `lora_alpha`, `lora_dropout`, and `lora_r`, which control the scaling, dropout rate, and rank for LoRA, respectively. The `bias` is set to "none", and `task_type` is specified as "CAUSAL_LM" to indicate that the model is intended for causal language modeling tasks.

In [26]:
# Set supervised fine-tuning parameters
peft_config = LoraConfig( lora_alpha = lora_alpha,
                          lora_dropout = lora_dropout,
                          r = lora_r,
                          bias="none",
                          task_type="CAUSAL_LM",
                          )

## Step 5: Huggingface Trainer Configuration

In this step, we import the necessary components to configure the training process using Hugging Face's `transformers` library and `trl`. The `TrainingArguments` class is used to define various training parameters such as learning rate, batch size, and the number of training epochs. The `SFTTrainer` class from the `trl` library is a specialized trainer designed for supervised fine-tuning (SFT) tasks, providing additional functionalities tailored for fine-tuning language models.

In [27]:
from transformers import TrainingArguments
from trl import SFTTrainer
from transformers.trainer_utils import FSDPOption

Here, we define various training parameters for the supervised fine-tuning process. The `output_dir` specifies where to save the training results, while `num_train_epochs` sets the training duration to half an epoch. We disable mixed precision training with `fp16` and `bf16` set to `False`, opting for a batch size of 2 for both training and evaluation.

`gradient_accumulation_steps` is set to 1 to apply gradients after each batch, and `gradient_checkpointing` is enabled to save memory during training. We limit the maximum gradient norm to 0.3 to prevent exploding gradients. The learning rate is adjusted to `2e-5`, lower than the default, with a weight decay of `0.01` to encourage regularization.

The optimizer is set to `paged_adamw_8bit` to save memory and enhance training speed. The learning rate scheduler is fixed at a constant rate, and no specific maximum training steps are defined (`max_steps = -1`). We allocate a warm-up ratio of 0.03 for gradual learning rate increase, disable length grouping, and set logging intervals at every 25 steps. The sequence length is set to `None` to allow dynamic lengths, and packing is disabled. Finally, the `save_steps` parameter is set to a very high value to avoid saving intermediate results and thus conserve disk space.

In [28]:
output_dir = "./results"
num_train_epochs = 3
fp16 = False
bf16 = False
per_device_train_batch_size = 2
per_device_eval_batch_size = 2
gradient_accumulation_steps = 1
gradient_checkpointing = True
max_grad_norm = 0.3
learning_rate = 2e-5   # default is 2e-4
weight_decay = 0.01   # 0.001
optim = 'paged_adamw_8bit'
lr_scheduler_type = "constant"
max_steps = -1
warmup_ratio = 0.03
group_by_length = False
logging_steps = 25
max_seq_length = None
packing = False
save_steps = 1000000 # not saving intermediate results to save disk space
fsdp = False
# Load the entire model on the GPU 0
# device_map = {"": "cuda"} # change to 'auto' on local machine

This part sets up the training arguments for the Hugging Face Trainer using the previously defined variables. The `TrainingArguments` class allows you to specify essential parameters for training, such as the output directory, the number of epochs, batch sizes, optimization method, learning rate, and logging frequency.

In [29]:
if fsdp == True:
  training_arguments = TrainingArguments(
                output_dir = output_dir,
                num_train_epochs = num_train_epochs,
                per_device_train_batch_size =per_device_train_batch_size,
                gradient_accumulation_steps = gradient_accumulation_steps,
                optim = optim,
                save_steps = save_steps,
                logging_steps = logging_steps,
                learning_rate = learning_rate,
                weight_decay = weight_decay,
                fp16 = fp16,
                bf16 = bf16,
                max_grad_norm = max_grad_norm,
                max_steps = max_steps,
                warmup_ratio = warmup_ratio,
                group_by_length = group_by_length,
                lr_scheduler_type = lr_scheduler_type,
                fsdp=FSDPOption.FULL_SHARD,  # Enable FSDP (full sharding)
                fsdp_config={
                    "fsdp_min_num_params": 1e6,  # Shard parameters larger than 1M
                    "fsdp_transformer_layer_cls_to_wrap": "LlamaDecoderLayer",  # Wrap transformer layers
                },
                )
else:
  training_arguments = TrainingArguments(
                output_dir = output_dir,
                num_train_epochs = num_train_epochs,
                per_device_train_batch_size =per_device_train_batch_size,
                gradient_accumulation_steps = gradient_accumulation_steps,
                optim = optim,
                save_steps = save_steps,
                logging_steps = logging_steps,
                learning_rate = learning_rate,
                weight_decay = weight_decay,
                fp16 = fp16,
                bf16 = bf16,
                max_grad_norm = max_grad_norm,
                max_steps = max_steps,
                warmup_ratio = warmup_ratio,
                group_by_length = group_by_length,
                lr_scheduler_type = lr_scheduler_type)

In this section, the `SFTTrainer` is instantiated with the model and configurations set up earlier. The `model` parameter specifies which model to train, while `peft_config` applies the previously defined PEFT (Parameter-Efficient Fine-Tuning) settings. The `max_seq_length` controls the maximum length of input sequences, and `tokenizer` is the tokenizer used for processing the data. The `args` parameter includes all the training arguments defined previously. Additionally, `packing=True` allows for packing sequences of different lengths into the same batch, optimizing training efficiency. Finally, `**data_module` unpacks the training dataset and data collator into the trainer.

In [30]:
trainer = SFTTrainer(
            model = model,
            peft_config = peft_config,
            max_seq_length = max_seq_length,
            tokenizer = tokenizer,
            args = training_arguments,
            packing = True,
            **data_module
        )



## Step 6: Train and Merge

The `trainer.train()` function is then called to start the training process. This function handles the entire training loop, applying the defined training parameters, dataset, and model settings to fine-tune the model on the provided data.

In [31]:
print('Training model with the configurations mentioned...')
trainer.train()

`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`.


Training model with the configurations mentioned...


Step,Training Loss
25,0.4779
50,0.4864
75,0.4544
100,0.4345
125,0.3438
150,0.3616
175,0.3516
200,0.3443
225,0.3887
250,0.3281


TrainOutput(global_step=3003, training_loss=0.31641386034009933, metrics={'train_runtime': 13920.687, 'train_samples_per_second': 0.431, 'train_steps_per_second': 0.216, 'total_flos': 2.0059081944327782e+17, 'train_loss': 0.31641386034009933, 'epoch': 3.0})

In [None]:
starts: 4.20 end: 8.12

`trainer.save_model(output_dir)` saves the fine-tuned model to the specified `output_dir`.

In [32]:
trainer.save_model(output_dir)
print('Model finetuned and saved to ', output_dir)

Model finetuned and saved to  ./results


In this code snippet, we are preparing to load and utilize the fine-tuned model. First, `base_model = LlamaForCausalLM.from_pretrained(model_name, return_dict=True, torch_dtype=torch.float16)` loads the base model with 16-bit floating-point precision for reduced memory usage and faster computation. Next, `model = PeftModel.from_pretrained(base_model, output_dir)` loads the fine-tuned model parameters from the specified `output_dir`. The `merge_and_unload()` method is then called to consolidate the model's parameters into a single structure. Finally, `tokenizer = CodeLlamaTokenizer.from_pretrained(output_dir)` loads the tokenizer associated with the fine-tuned model.

In [33]:
base_model = LlamaForCausalLM.from_pretrained(model_name,
                                              return_dict = True,
                                              torch_dtype = torch.float16, device_map='auto'
                                              )
model = PeftModel.from_pretrained(base_model, output_dir)
model = model.merge_and_unload()
tokenizer = CodeLlamaTokenizer.from_pretrained(output_dir)



Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


What it does:
 - Loads the base model using the specified `base_model_name`, configured for half-precision (`float16`).
 - Loads the fine-tuned model parameters from the specified directory using `PeftModel`.
 - Merges the base model and the fine-tuned model into a single model and unloads any unnecessary parts to save memory.
 - Loads the tokenizer corresponding to the fine-tuned model from the specified directory.

# Generate code

The `generate` function takes several inputs: a task identifier (`taskid`), a text prompt (`prompt`), the device for computation (`device`), the model for generating text (`model`), and the tokenizer for processing the text (`tokenizer`). It begins by tokenizing the input prompt using the tokenizer and prepares the model inputs on the specified device. The function logs the task ID for reference. Then, it generates text using the model with a greedy decoding strategy, specifying a maximum output length and using appropriate end-of-sequence and padding tokens. The raw generated output is decoded back into a string for readability. Before returning, it attempts to process the generated output using the `processOutput` function; if this processing fails, it defaults to returning the raw output. Finally, the function returns both the raw output and the processed output (or the raw output in case of an error).

In [34]:
import re

def generate(taskid, prompt, device, model, tokenizer):

        """
        Generate text from a prompt using the specified model and tokenizer.

        Input arguments:
            taskid (int):
                An identifier for the task, used for printing and logging purposes.

            prompt (str):
                The input text prompt that the model will use to generate a response.

            device (torch.device):
                The device on which the model and tokenization operations will be performed (e.g., 'cpu' or 'cuda').

            model (PreTrainedModel):
                The language model used to generate text from the prompt.

            tokenizer (PreTrainedTokenizer):
                The tokenizer used to tokenize the prompt and decode the generated output.

        What it does:
                - Tokenizes the prompt using the provided `tokenizer`.
                - Passes the tokenized prompt to the model for generating text with greedy decoding.
                - Decodes the generated tokens back into text.
                - Processes the generated output using the `processOutput` function to perform any additional processing.
                - Logs the task ID and the generated text before and after processing.

        Returns:
            tuple:
                - `output` (str): The raw text generated by the model.
                - `after` (str): The processed output, after passing through the `processOutput` function.
                If processing fails, `output` is returned as-is.
        """


        model_inputs = tokenizer(prompt, return_tensors='pt').to(device)
        print("Task ID: " + str(taskid) + "\n" + 100 * '-')


        greedy_output = model.generate(model_inputs['input_ids'], max_new_tokens = 500, eos_token_id=tokenizer.eos_token_id, pad_token_id = tokenizer.pad_token_id)
        print("Generating...")


        output = tokenizer.decode(greedy_output[0])
        print('Before processing\n',output)

        try:
            after = processOutput(output)
            print('After processing\n', after)

        except:
            after = output
            print('Could not process')

        return output, after.strip()

The `processOutput` function takes a string (`output`) representing raw text generated by the language model, which often includes unwanted formatting and special tags. It first checks for specific starters indicating the beginning of the relevant response, and then it removes these starters along with special tags like `<s>` and `</s>`. The function replaces double quotes with single quotes, deletes any occurrence of the phrase "Let's think step by step," and removes text enclosed in triple single quotes (''') as well as comments that start with hashtags (#). It identifies the positions of function definitions and return statements to determine if the generated code is complete. If there are more function definitions than return statements, it trims the code to ensure completeness. The function attempts to execute the cleaned code using the `exec` function. If execution fails, it checks for issues, such as incomplete return statements or additional lines after the last function, and further refines the output to return a more executable version. Ultimately, it returns the processed Python code, ready for execution.

In [35]:
def processOutput(output):

        """
        Process the text generated by the language model to clean and prepare it for execution as Python code.

        Input arguments:
            output (str):
                The raw, unprocessed text generated by the language model, often containing special tags, instructions, and formatting that need to be removed.

        What it does:
              - Removes special tags like `<s>` and `</s>`.
              - Replaces double quotation marks (") with single quotation marks (').
              - Deletes the sentence "Let's think step by step." if present.
              - Removes instructions enclosed in triple single quotes (''') and comments starting with hashtags (#).
              - Trims the code if it contains the line "if __name__ == '__main__':".
              - If there are more function definitions (`def`) than return statements, it assumes incomplete code and removes the extra function.
              - Attempts to execute the cleaned code. If execution fails, it further checks for issues like incomplete return statements or additional statements after a function and removes problematic lines.

        Returns:
            str: The processed and cleaned Python code ready for execution.
                If errors are found during execution, attempts to clean up the code further and return a more executable version.
        """

        possible_starters = ['### Response:', '### Output:', 'FIX =','Write code for this problem:']

        for starter in possible_starters:
            if(starter in output):
                output = output[output.find(starter)+len(starter):]

        output = output.replace('<s>','').replace('</s>','').strip()
        output = output.replace('"',"'")
        output = output.replace('\nLet\'s think step by step.\n\n',"")
        output = output.replace('Answer:',"")
        output = remove_text_inside_quotes(output, "'''")
        output = remove_lines_starting_with_hashtag(output)
        deff = [m.start() for m in re.finditer(r"def ",output)] # finds the occurrences of 'def'(the starting indices)
        ret = [m.start() for m in re.finditer(r"return",output)] # finds the occurrences of 'return'(the starting indices)

        if("'''" in output):
            output = output[:deff[len(deff)-1]]
        if("if __name__ == '__main__':" in output):
            main = [m.start() for m in re.finditer(r"if __name__ == '__main__':",output)][0]
            output = output[:main]

        if (len(ret)==0):
            return output

        #if function starts but theres no return statement
        if (ret[len(ret)-1] < deff[len(deff)-1]):
            #print('funct not over!')
            output = output[:deff[len(deff)-1]]

        try:
            #execute the output
            exec(output)

        except:
            #Function starts and there is a return statement but there is a statement that doesn't end

            last_return = ret[len(ret)-1]
            try:
                #there are additional statements after the end of the function
                #sol : stop till the last return statement
                end = last_return + [m.start() for m in re.finditer("\n",output[last_return:])][0]
                output = output[:end]
            except:
                #the return statement is not complete
                #sol : stop till before the last def line
                output = output[:deff[len(deff)-1]]

        return (output)

The `remove_text_inside_quotes` function takes an input string and a specified type of quotation mark (e.g., single or double quotes) to remove all text enclosed within those quotation marks. It utilizes a regular expression pattern to identify the quoted sections and removes everything between the matching quotation marks, including the quotes themselves. The function returns the modified input string, which has all specified quoted sections removed.

In [36]:
def remove_text_inside_quotes( input_string, quote):

        """
        Remove all text enclosed within a specified quote from the input string.

        Input arguments:
            input_string (str):
                The input text which may contain quoted sections.
            quote (str):
                The type of quotation mark used to enclose the text you want to remove (e.g., single quote, double quote, triple quotes).

        What it does:
            - Utilizes a regular expression pattern to identify and remove all text found between pairs of the specified quotation mark (`quote`).
            - Deletes everything between the matching quotation marks, including the quotes themselves.

        Returns:
            str: The input text with all sections enclosed by the specified quotes removed.
        """

        exp = quote +'(.*?)' + quote
        pattern = re.compile(exp, re.DOTALL)
        result = re.sub(pattern, '', input_string)
        return result

The `remove_lines_starting_with_hashtag` function removes all lines from the input string that begin with a hashtag (#), which are typically used for comments. It splits the input string into individual lines, filters out any lines that start with a hashtag, and then joins the remaining lines back into a single string. The function returns the cleaned input string with all comment lines removed.

In [37]:
def remove_lines_starting_with_hashtag( input_string):

        """
        Remove all lines from the input string that start with a hashtag (#).

        Input arguments:
            input_string (str):
                The input text which may contain lines starting with a hashtag (#), typically comments.

        What it does:
            - Reads the input string line by line.
            - Filters out any lines that start with a hashtag (#), commonly used for comments.
            - Joins the remaining lines back into a single string.

        Returns:
            str: The input text with all lines starting with a hashtag removed.
        """

        lines = input_string.split('\n')
        filtered_lines = [line for line in lines if not line.startswith('#')]
        result_string = '\n'.join(filtered_lines)
        return result_string

# Evaluation - HumanEval & MBPP

[Reference from the GitHub repository](https://github.com/openai/human-eval.git)

In [41]:
import locale
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Union, Iterable, Dict, Optional
import itertools
import os
import gzip
import json
import contextlib
import faulthandler
import io
import multiprocessing
import platform
import signal
import tempfile
import numpy as np
import tqdm

class Evaluation:

    class TimeoutException(Exception):
        """Custom exception for handling timeout cases."""
        pass

    def __init__(self):
        self.paths = {'humaneval': 'eval/human_eval.jsonl', 'mbpp': 'eval/mbpp_datasetV4infilling.jsonl'}

    def read_problems(self, task: str) -> Dict:
        evalset_file = self.paths[task]
        return {task["task_id"]: task for task in self.stream_jsonl(evalset_file)}

    def stream_jsonl(self, filename: str) -> Iterable[Dict]:
        """Stream JSONL data from a file."""
        if filename.endswith(".gz"):
            with gzip.open(filename, "rt") as fp:
                for line in fp:
                    if line.strip():
                        yield json.loads(line)
        else:
            with open(filename, "r") as fp:
                for line in fp:
                    if line.strip():
                        yield json.loads(line)

    def write_jsonl(self, filename: str, data: Iterable[Dict], append: bool = False):
        """Write JSONL data to a file."""
        mode = 'ab' if append else 'wb'
        with gzip.open(filename, mode) if filename.endswith(".gz") else open(filename, mode) as fp:
            for x in data:
                fp.write((json.dumps(x) + "\n").encode('utf-8'))

    def estimate_pass_at_k(
        self,
        num_samples: Union[int, List[int], np.ndarray],
        num_correct: Union[List[int], np.ndarray],
        k: int
    ) -> np.ndarray:
        """Estimate pass@k metric."""
        def estimator(n: int, c: int, k: int) -> float:
            if n - c < k:
                return 1.0
            return 1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1))

        num_samples_it = itertools.repeat(num_samples, len(num_correct)) if isinstance(num_samples, int) else iter(num_samples)
        return np.array([estimator(int(n), int(c), k) for n, c in zip(num_samples_it, num_correct)])

    def evaluate_functional_correctness_for_n_tasks(
        self,
        sample_file: str,
        task: str,
        k: List[int] = [1, 2, 10, 100],
        n_workers: int = 4,
        timeout: float = 3.0,
    ) -> Dict:
        problems = self.read_problems(task=task)
        results = defaultdict(list)
        completion_id = Counter()
        n_samples = 0

        with ThreadPoolExecutor(max_workers=n_workers) as executor:
            futures = []
            print("Reading samples...")
            for sample in tqdm.tqdm(self.stream_jsonl(sample_file)):
                task_id = sample["task_id"]
                completion = sample["completion"]
                args = (problems[task_id], completion, timeout, completion_id[task_id])
                futures.append(executor.submit(self.check_correctness, *args))
                completion_id[task_id] += 1
                n_samples += 1

            print("Running test suites...")
            for future in tqdm.tqdm(as_completed(futures), total=len(futures)):
                result = future.result()
                results[result["task_id"]].append((result["completion_id"], result))

        total, correct = [], []
        for result in results.values():
            result.sort()
            passed = [r[1]["passed"] for r in result]
            total.append(len(passed))
            correct.append(sum(passed))
        total = np.array(total)
        correct = np.array(correct)

        pass_at_k = {f"pass@{k_val}": self.estimate_pass_at_k(total, correct, k_val).mean()
                     for k_val in k if (total >= k_val).all()}

        out_file = sample_file + "_results.jsonl"
        print(f"Writing results to {out_file}...")
        self.write_jsonl(out_file, tqdm.tqdm(self.combine_results(results, problems, sample_file), total=n_samples))

        return pass_at_k, pass_at_k[list(pass_at_k.keys())[0]] * 100

    def combine_results(self, results, problems, sample_file):
        for sample in self.stream_jsonl(sample_file):
            task_id = sample["task_id"]
            result = results[task_id].pop(0)
            sample['task'] = problems[task_id]
            sample["result"] = result[1]["result"]
            sample["passed"] = result[1]["passed"]
            yield sample

    def check_correctness(
        self,
        problem: Dict,
        completion: str,
        timeout: float,
        completion_id: Optional[int] = None
    ) -> Dict:
        def unsafe_execute():
            with self.create_tempdir():
                self.reliability_guard()
                check_program = (
                    problem["test"] + "\n" +
                    problem["prompt"] + completion + "\n" +
                    f"check({problem['entry_point']})"
                )
                try:
                    exec_globals = {}
                    with self.swallow_io():
                        with self.time_limit(timeout):
                            exec(check_program, exec_globals)
                    result.append("passed")
                except self.TimeoutException:
                    result.append("timed out")
                except BaseException as e:
                    result.append(f"failed: {e}")

        manager = multiprocessing.Manager()
        result = manager.list()
        p = multiprocessing.Process(target=unsafe_execute)
        p.start()
        p.join(timeout + 1)

        if p.is_alive():
            p.kill()

        if not result:
            result.append("timed out")

        return dict(
            task_id=problem["task_id"],
            passed=result[0] == "passed",
            result=result[0],
            completion_id=completion_id,
        )

    @contextlib.contextmanager
    def time_limit(self, seconds: float):
        """Context manager for enforcing a timeout."""
        def signal_handler(signum, frame):
            raise self.TimeoutException("Timed out!")
        signal.signal(signal.SIGALRM, signal_handler)
        signal.setitimer(signal.ITIMER_REAL, seconds)
        try:
            yield
        finally:
            signal.setitimer(signal.ITIMER_REAL, 0)

    @contextlib.contextmanager
    def swallow_io(self):
        """Suppress stdout and stderr."""
        stream = self.WriteOnlyStringIO()
        with contextlib.redirect_stdout(stream):
            with contextlib.redirect_stderr(stream):
                yield

    @contextlib.contextmanager
    def create_tempdir(self):
        """Create and switch to a temporary directory."""
        with tempfile.TemporaryDirectory() as dirname:
            with self.chdir(dirname):
                yield dirname

    class WriteOnlyStringIO(io.StringIO):
        def read(self, *args, **kwargs):
            raise IOError

        def readline(self, *args, **kwargs):
            raise IOError

        def readable(self, *args, **kwargs):
            return False

    @contextlib.contextmanager
    def chdir(self, root):
        """Change directory context."""
        cwd = os.getcwd()
        os.chdir(root)
        try:
            yield
        finally:
            os.chdir(cwd)

    def reliability_guard(self, maximum_memory_bytes: Optional[int] = None):
        """Apply restrictions to ensure safety."""
        if maximum_memory_bytes is not None:
            import resource
            resource.setrlimit(resource.RLIMIT_AS, (maximum_memory_bytes, maximum_memory_bytes))
            resource.setrlimit(resource.RLIMIT_DATA, (maximum_memory_bytes, maximum_memory_bytes))
            if platform.uname().system != 'Darwin':
                resource.setrlimit(resource.RLIMIT_STACK, (maximum_memory_bytes, maximum_memory_bytes))

        faulthandler.disable()

        import builtins
        builtins.exit = None
        builtins.quit = None

        import os
        os.environ['OMP_NUM_THREADS'] = '1'

        os.kill = None
        os.system = None

        import shutil
        shutil.rmtree = None
        shutil.move = None

        import subprocess
        subprocess.Popen = None

        if isinstance(__builtins__, dict):
            __builtins__['help'] = None
        else:
            builtins.help = None

## Defining the prompts for evaluation

Generates a formatted instruction prompt based on the specified dataset type. If the dataset is CodeAlpaca, it takes a problem statement as input and instructs the model to create a Python script based on that statement. For the LeetCode dataset, it extracts the question and function header from the input, instructing the model to write Python code for the specified problem while clearly separating the input and expected output.

In [42]:
def generate_alpaca_prompt(input):

            """
            Generates a formatted instruction prompt based on the CodeAlpaca dataset.

            Input arguments:
                input (str): The problem statement or task for which the Python script is to be created.

            What it does:
                Constructs a prompt in a specific format that includes the instruction to create a Python script
                based on the provided input problem statement. The prompt is designed to guide the model in generating
                a relevant response.

            Returns:
                str: A formatted instruction prompt for a model trained on the CodeAlpaca dataset.
            """

            INSTRUCTION = f"""Below is an instruction that describes a task. Write a response that appropriately completes the request.


            ### Instruction:
            Create a Python script for this problem:
            {input}

            ### Response:"""

            return INSTRUCTION


def generate_leetcode_prompt(input):

        """
            Generates a formatted instruction prompt based on the leetcode dataset.

            Input arguments:
                input (str): The problem statement or task for which the Python script is to be created.

            What it does:
                Constructs a prompt in a specific format that includes the instruction to create a Python script
                based on the provided input problem statement. The prompt is designed to guide the model in generating
                a relevant response.

            Returns:
                str: A formatted instruction prompt for a model trained on the leetcode dataset.

        """

        try:
            ques = input[input.find('"""')+3:]
        except:
            ques = input[input.find("'''")+3:]

        func_header =  input[:input.find("'''")]
        INSTRUCTION = f"""Write a response that appropriately completes the request.


                    ### Input:
                    Write python code for this problem:
                    {ques}

                    ### Output: \n{func_header}"""

        return INSTRUCTION

In [43]:
def generate_leetcode_promptV2(input):

        """
            Generates a formatted instruction prompt based on the leetcode dataset.

            Input arguments:
                input (str): The problem statement or task for which the Python script is to be created.

            What it does:
                Constructs a prompt in a specific format that includes the instruction to create a Python script
                based on the provided input problem statement. The prompt is designed to guide the model in generating
                a relevant response.

            Returns:
                str: A formatted instruction prompt for a model trained on the leetcode dataset.
            
        """
        
        try:
            ques = input[input.find('"""')+3:]
        except:
            ques = input[input.find("'''")+3:]

        func_header =  input[:input.find("'''")]
        INSTRUCTION = f"""Write a response that appropriately completes the request.


                    ### Input:
                    Write python code for this problem:
                    {ques}

                    ### Output: \n{func_header}"""
        
        return INSTRUCTION

In [None]:
file_path = '/eval/mbpp_dataset_prompts.jsonl'

def getFewShotPromptSamples(num = 3):
# List to store the extracted field values
  prompts = []
  code = []
  # Read the JSONL file
  with open(file_path, 'r') as file:
      for line in file:
          record = json.loads(line)  # Parse each line as a JSON object
          if 'prompt' in record:  # Replace 'your_field_name' with the actual field name
              prompts.append(record['prompt'])
          if 'canonical_solution' in record:
              code.append(record['canonical_solution'])
  return prompts[:num], code[:num]

In [None]:
def generate_few_shot_prompt(input, num = 3):

    prompts = []
    code = []
    # Read the JSONL file
    with open(file_path, 'r') as file:
        for line in file:
            record = json.loads(line)  # Parse each line as a JSON object
            if 'prompt' in record:  # Replace 'your_field_name' with the actual field name
                prompts.append(record['prompt'])
            if 'canonical_solution' in record:
                code.append(record['canonical_solution'])
    prompts, code = prompts[:num], code[:num]
    
    # Generate the few-shot prompt
    few_shot_prompt = "# The following contains examples of Python programming tasks with clear descriptions, inputs, and outputs. Each task includes a docstring explaining the problem, followed by Python code solving it.\n"

    for idx, (prompt, answers) in enumerate(zip(prompts, code), start=1):
        few_shot_prompt += f"# Example {idx}\n"
        few_shot_prompt += prompt.strip() + "\n"
        few_shot_prompt += '### Output:\n'
        few_shot_prompt += answers + "\n\n"

    few_shot_prompt += '# New Task:\n'
    few_shot_prompt += input
    # Print the generated few-shot prompt

    return few_shot_prompt

In [44]:
def generate_prompt(input, entry_point = ''):

        """
            Generates a formatted instruction prompt based on the leetcode dataset.

            Input arguments:
                input (str): The problem statement or task for which the Python script is to be created.

            What it does:
                Constructs a prompt in a specific format that includes the instruction to create a Python script
                based on the provided input problem statement. The prompt is designed to guide the model in generating
                a relevant response.

            Returns:
                str: A formatted instruction prompt for a model trained on the leetcode dataset.

        """

        try:
            ques = input[input.find('"""') + 1:]
        except:
            ques = input[input.find("'''") + 1:]

        # func_header =  input[:input.find("'''")]

        # INSTRUCTION = f"""Write a python code that appropriately completes the request. {ques}. ### Response:"""
        INSTRUCTION = f"""{ques}. ### Output: \n"""
        # print(f'The fun header is : {func_header}')

        return INSTRUCTION

In [45]:
prompts = {'alpaca': generate_alpaca_prompt, 
           'leetcode': generate_leetcode_prompt, 
           'normal': generate_prompt,
           'few_shot': generate_few_shot_prompt}

## Run the evaluation

This function generates outputs for all problems in the HumanEval dataset using the specified language model and tokenizer. It reads the problems and, based on the given `prompt_type` (e.g., 'alpaca' or 'leetcode'), generates responses by calling the `generate` function. The results are stored in a list of dictionaries containing the task ID, full generation, and completion. Finally, the samples are saved to a JSONL file named "samples.jsonl". If no prompt type is provided, it defaults to using the standard prompt for each problem.

In [46]:
#to run all the problems
def get_outputs(model, tokenizer, device, tasks = ['humaneval'], prompt_type = None, num_samples_per_task = 1, few_shot_num = 3):

        """
        Reads all the problems from the HumanEval dataset and generates outputs using the specified model and tokenizer.

        Input arguments:
            model: The language model to be used for generating outputs.
            tokenizer: The tokenizer associated with the model.
            device: The device on which the model is to be executed (e.g., 'cuda' or 'cpu').
            prompt_type (str, optional): The type of prompt to use (e.g., 'alpaca' or 'leetcode'). Defaults to None.
            num_samples_per_task (int, optional): The number of samples to generate for each task. Defaults to 1.

        What it does:
            1. Reads all problems from the HumanEval dataset.
            2. Generates outputs for each problem using the specified model and tokenizer.
            3. Supports different types of prompts based on the provided `prompt_type`.
            4. Creates a list of dictionaries in the format [{task_id: generation}, ...].
            5. Writes the generated outputs to a JSONL file named "samples.jsonl".

        Returns:
            None
        """

        eval = Evaluation()

        for task in tasks:

          problems = eval.read_problems(task = task)
          samples = []
          if(prompt_type == None):

              for task_id in problems:

                  full, completion = generate(task_id, problems[task_id]["prompt"], device, model, tokenizer)
                  samples.append(dict(task_id=task_id, full_generation = full, completion = completion))
          else:
              if(prompt_type == 'few_shot'):
                  for task_id in problems:

                      full, completion = generate(task_id, prompts[prompt_type](problems[task_id]["prompt"], few_shot_num), device, model, tokenizer)
                      samples.append(dict(task_id=task_id, full_generation = full, completion = completion))
              else:
                for task_id in problems:

                    full, completion = generate(task_id, prompts[prompt_type](input = problems[task_id]["prompt"]), device, model, tokenizer)
                    samples.append(dict(task_id=task_id, full_generation = full, completion = completion))

          eval.write_jsonl(task + "_results.jsonl", samples)

This function runs the specified number of tests on the HumanEval dataset using a given language model and tokenizer. It reads the problems and selects the first `num_of_tests` entries.

In [47]:
#to run the first n tests
def get_outputs_for_first_n_tests(model, tokenizer, device, tasks = ['humaneval'], prompt_type = None, num_of_tests = 1, few_shot_num = 3, num_samples_per_task = 1):

        """
        Runs the first n tests on the provided model and tokenizer and generates outputs.

        Input arguments:
            model: The language model to be used for generating outputs.
            tokenizer: The tokenizer associated with the model.
            device: The device on which the model is to be executed (e.g., 'cuda' or 'cpu').
            prompt_type (str, optional): The type of prompt to use (e.g., 'alpaca' or 'leetcode'). Defaults to None.
            num_of_tests (int, optional): The number of tests to run. Defaults to 1.
            num_samples_per_task (int, optional): The number of samples to generate for each task. Defaults to 1.

        What it does:
            - Reads problems from the HumanEval dataset.
            - Collects the specified number of problems for testing.
            - Generates outputs for the selected problems using the specified model and tokenizer.
            - Supports different types of prompts based on the provided `prompt_type`.
            - Writes the generated outputs to a JSONL file named "samples.jsonl".

        Returns:
            None
        """

        eval = Evaluation()
        for task in tasks:

          problems = eval.read_problems(task = task)
          probs = {}
          n = 0
          for key, value in problems.items():
              if(n < num_of_tests):
                  probs[key] = value
              n = n + 1

          samples = []
          if(prompt_type == None):

              for task_id in probs:
                  full, completion = generate(task_id, problems[task_id]["prompt"], device, model, tokenizer)
                  samples.append(dict(task_id=task_id, full_generation = full, completion = completion))

          else:
              if(prompt_type == 'few_shot'):
                  for task_id in problems:

                      full, completion = generate(task_id, prompts[prompt_type](problems[task_id]["prompt"], few_shot_num), device, model, tokenizer)
                      samples.append(dict(task_id=task_id, full_generation = full, completion = completion))
              else:
                for task_id in problems:

                    full, completion = generate(task_id, prompts[prompt_type](input = problems[task_id]["prompt"]), device, model, tokenizer)
                    samples.append(dict(task_id=task_id, full_generation = full, completion = completion))

          eval.write_jsonl(task + "_results.jsonl", samples)

This function call generates outputs for the first test in the HumanEval dataset using the specified language model and tokenizer. The `device` is set to 'cuda' to utilize a GPU for computation. The `prompt_type` is set to 'leetcode', which means the function will format the prompts accordingly. The function will generate one sample for the specified test. This output will then be written to a JSONL file.

In [48]:
model.to('cuda')

LlamaForCausalLM(
  (model): LlamaModel(
    (embed_tokens): Embedding(32000, 4096)
    (layers): ModuleList(
      (0-31): 32 x LlamaDecoderLayer(
        (self_attn): LlamaAttention(
          (q_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (k_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (v_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (o_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (rotary_emb): LlamaRotaryEmbedding()
        )
        (mlp): LlamaMLP(
          (gate_proj): Linear(in_features=4096, out_features=11008, bias=False)
          (up_proj): Linear(in_features=4096, out_features=11008, bias=False)
          (down_proj): Linear(in_features=11008, out_features=4096, bias=False)
          (act_fn): SiLU()
        )
        (input_layernorm): LlamaRMSNorm()
        (post_attention_layernorm): LlamaRMSNorm()
      )
    )
    (norm): LlamaRMSNorm()
  )
  (lm_head): Lin

In [53]:
tasks = ['mbpp']

In [55]:
get_outputs_for_first_n_tests(model, tokenizer, tasks = tasks, device = 'cuda', few_shot_num = 3, num_of_tests = 1, prompt_type = 'leetcode', num_samples_per_task = 1)

Task ID: MBPP/11
----------------------------------------------------------------------------------------------------
Generating...
Before processing
 </s> Write a response that appropriately completes the request.


                    ### Input:
                    Write python code for this problem:
                    f remove_Occ(s,ch): 
    '''
    Write a python function to remove first and last occurrence of a given character from the string.
    You need to pass these assertions:
    assert remove_Occ("hello","l") == "heo"
    assert remove_Occ("abcda","a") == "bcd"
    assert remove_Occ("PHP","P") == "H".
    '''



                    ### Output: 
def remove_Occ(s,ch): 
    
    first = s.find(ch)
    last = s.rfind(ch)
    if first == -1 or last == -1:
        return s
    return s[:first] + s[first+1:last] + s[last+1:]
</s>
After processing
 def remove_Occ(s,ch): 
    
    first = s.find(ch)
    last = s.rfind(ch)
    if first == -1 or last == -1:
        return s
    retu

For HumanEval use 'leetcode' prompt, for mbpp use 'normal'

In [57]:
get_outputs(model, tokenizer, tasks = tasks, device = 'cuda', few_shot_num = 3, prompt_type = 'leetcode', num_samples_per_task = 1)

Task ID: MBPP/11
----------------------------------------------------------------------------------------------------
Generating...
Before processing
 </s> Write a response that appropriately completes the request.


                    ### Input:
                    Write python code for this problem:
                    f remove_Occ(s,ch): 
    '''
    Write a python function to remove first and last occurrence of a given character from the string.
    You need to pass these assertions:
    assert remove_Occ("hello","l") == "heo"
    assert remove_Occ("abcda","a") == "bcd"
    assert remove_Occ("PHP","P") == "H".
    '''



                    ### Output: 
def remove_Occ(s,ch): 
    
    first = s.find(ch)
    last = s.rfind(ch)
    if first == -1 or last == -1:
        return s
    return s[:first] + s[first+1:last] + s[last+1:]
</s>
After processing
 def remove_Occ(s,ch): 
    
    first = s.find(ch)
    last = s.rfind(ch)
    if first == -1 or last == -1:
        return s
    retu

In [42]:
task = {"task_id": "MBPP/11", "prompt": "'''Write a python function to remove first and last occurrence of a given character from the string. You need to pass these assertions:     assert remove_Occ(\"hello\",\"l\") == \"heo\"\n    assert remove_Occ(\"abcda\",\"a\") == \"bcd\"\n    assert remove_Occ(\"PHP\",\"P\") == \"H\".'''\n\n ", "entry_point": "remove_Occ", "canonical_solution": "    def remove_Occ(s,ch): \n        for i in range(len(s)): \n            if (s[i] == ch): \n                s = s[0 : i] + s[i + 1:] \n                break\n        for i in range(len(s) - 1,-1,-1):  \n            if (s[i] == ch): \n                s = s[0 : i] + s[i + 1:] \n                break\n        return s ", "test": "\n\nMETADATA = {\n    'author': 'google',\n    'dataset': 'test'\n}\n\ndef check(candidate):\n    assert candidate(\"hello\",\"l\") == \"heo\"\n    assert candidate(\"abcda\",\"a\") == \"bcd\"\n    assert candidate(\"PHP\",\"P\") == \"H\"\n"} 

In [46]:
task['test'] + "\n"

'\n\nMETADATA = {\n    \'author\': \'google\',\n    \'dataset\': \'test\'\n}\n\ndef check(candidate):\n    assert candidate("hello","l") == "heo"\n    assert candidate("abcda","a") == "bcd"\n    assert candidate("PHP","P") == "H"\n\n'

## Calculate Pass@k

This code initializes the `HumanEval` class and evaluates the functional correctness of the generated outputs stored in the "samples.jsonl" file. The `evaluate_functional_correctness_for_n_tasks` method returns two metrics: `pass_at_k`, which indicates the number of tasks that passed the correctness test at a specified threshold, and `accuracy`, representing the proportion of correct outputs among the evaluated tasks.

In [58]:
eval = Evaluation()
for task in tasks:
  pass_at_k, accuracy = eval.evaluate_functional_correctness_for_n_tasks(task + "_results.jsonl", task = task)
  print(f"\nThe pass@1 for task {task}: {pass_at_k}")

Reading samples...


500it [00:00, 2001.84it/s]


Running test suites...


100%|██████████| 500/500 [01:19<00:00,  6.28it/s]


Writing results to mbpp_results.jsonl_results.jsonl...


100%|██████████| 500/500 [00:00<00:00, 27432.76it/s]


The pass@1 for task mbpp: {'pass@1': 0.376}





In [None]:
rank 60 : 'pass@1': 0.43902, mbpp: 'pass@1': 0.376

In [None]:
it :: 'pass@1': 0.45731. im= 'pass@1': 0.4512

In [None]:
mbpp: im 'pass@1': 0.424}