# Environment setup

## Imports

In [1]:
!pip install datasets jsonlines peft > /dev/null

In [2]:
import os
import re
import sys
import json
import jsonlines
import torch
import torch.nn as nn

from datasets import Dataset, load_dataset
from peft import LoraConfig, get_peft_model

from tqdm import tqdm

from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    StoppingCriteria,
    StoppingCriteriaList,
    Trainer,
    TrainingArguments,
    DataCollatorForLanguageModeling
)

# Implementation

## Generate base model answers

In [3]:
# Criteria for stopping code generation
class StoppingCriteriaSub(StoppingCriteria):
    def __init__(self, stops, tokenizer):
        (StoppingCriteria.__init__(self),)
        self.stops = rf"{stops}"
        self.tokenizer = tokenizer

    def __call__(
            self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs
    ) -> bool:
        last_three_tokens = [int(x) for x in input_ids.data[0][-3:]]
        decoded_last_three_tokens = self.tokenizer.decode(last_three_tokens)

        return bool(re.search(self.stops, decoded_last_three_tokens))


# Generate model answer
def generate(problem, model, tokenizer):
    criterion = StoppingCriteriaSub(stops="\n}\n", tokenizer=tokenizer)
    stopping_criteria = StoppingCriteriaList([criterion])

    problem = tokenizer.encode(problem, return_tensors="pt").to('cuda')

    sample = model.generate(
        problem,
        max_new_tokens=256,
        min_new_tokens=128,
        pad_token_id=tokenizer.eos_token_id,
        do_sample=False,
        num_beams=1,
        stopping_criteria=stopping_criteria,
    )

    answer = tokenizer.decode(sample[0], skip_special_tokens=True)
    return answer

# Clean answer from comments and head of the function
def clean_answer(code, skip_lines=1):
    code_without_line_comments = re.sub(r"//.*", "", code)

    code_without_all_comments = re.sub(
        r"/\*.*?\*/", "", code_without_line_comments, flags=re.DOTALL
    )

    lines = code_without_all_comments.split("\n")

    for i, line in enumerate(lines):
        if line.startswith("fun "):
            return "\n".join(lines[i + skip_lines:])

    return code


# Generate answers for testing model
def generate_model_answers(model_name="ibm-granite/granite-3b-code-base-2k",
                           dataset_name="jetbrains/Kotlin_HumanEval"):

    dataset = load_dataset(dataset_name)['train']
    problem_dict = {problem['task_id']: problem for problem in dataset}

    model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16).to('cuda')
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    output = []
    for key in tqdm(list(problem_dict.keys()), leave=False):
        problem = problem_dict[key]["prompt"]
        answer = generate(problem, model, tokenizer)
        answer = clean_answer(answer)
        output.append({"task_id": key, "completion": answer, "language": "kotlin"})

    output_file = f"answers"

    with jsonlines.open(output_file, mode="w") as writer:
        for line in output:
            writer.write(line)

    del model
    torch.cuda.empty_cache()


In [4]:
generate_model_answers()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/4.59k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/175k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/161 [00:00<?, ? examples/s]

config.json:   0%|          | 0.00/680 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/41.6k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/4.97G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/1.99G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/137 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/4.13k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.06M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/1.02k [00:00<?, ?B/s]

  0%|          | 0/161 [00:00<?, ?it/s]The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


## Create synt data

In [5]:
# Split python input into comment and code
def split_problem_data(problem_str):
    start_index = problem_str.find('"""')
    end_index = problem_str.find('"""', start_index + 3)

    if start_index != -1 and end_index != -1:
        code = problem_str[:start_index].strip()

        comment = "/***" + problem_str[start_index + 3:end_index].strip() + "***/\n"

        return code, comment
    else:
        raise ValueError("Comment section not found.")


# Generate answer from prompt and code
def generate_kotlin_prompt(model, tokenizer, code, prompt, stop_crit):
    criterion = StoppingCriteriaSub(stops=stop_crit, tokenizer=tokenizer)
    stopping_criteria = StoppingCriteriaList([criterion])

    problem = prompt + code

    problem = tokenizer.encode(problem, return_tensors="pt").to('cuda')

    sample = model.generate(
        problem,
        max_new_tokens=512,
        min_new_tokens=256,
        pad_token_id=tokenizer.eos_token_id,
        do_sample=False,
        num_beams=4,
        stopping_criteria=stopping_criteria,
    )

    answer = tokenizer.decode(sample[0], skip_special_tokens=True)

    code = clean_answer(answer, 0)
    substring = " {"

    # Checking that function was generated
    if "fun " not in code:
        return "", ""

    func_head, func_body = code.split(substring, 1)

    func_head = func_head + substring

    return func_head, func_body


# Generate kotlin dataset from python
def create_synt_data(translate_count = 100, model_name='ibm-granite/granite-3b-code-base-2k', dataset_name="jinaai/code_exercises"):
    new_dataset = {"train": []}

    dataset = load_dataset(dataset_name)['train']

    model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16).to('cuda')
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    len_dataset = len(dataset)

    with tqdm(total=translate_count) as pbar:
        for i in range(len_dataset):

            if pbar.n >= translate_count:
                break

            # getting code and comments
            code, comment = split_problem_data(dataset[i]["problem"])

            # taking python solution
            sol_code = dataset[i]["solution"]

            # generate kotlin test from python
            func_head, func_body = generate_kotlin_prompt(model, tokenizer, comment + '\n' + code + '\n' + sol_code,
                                                          """Complete solution for python code and translate the following Python function to Kotlin.q
                                                          Ensure the Kotlin function has proper formatting with correct indentation.
                                                          The Kotlin function definition should include a `{` immediately after the parameter list, and the rest of the code should be indented as per Kotlin's syntax rules.
                                                          Python code:

                                                          ```python""",
                                                          "\n}\n")

            if func_head == func_body == "":
                continue

            new_dataset["train"].append([])
            new_dataset["train"][-1] = {}

            new_dataset["train"][-1]["prompt"] = comment + "\n\n" + func_head
            new_dataset["train"][-1]["solution"] = func_body

            pbar.update(1)

    with open("test_dataset.json", "w") as outfile:
        json.dump(new_dataset, outfile)

    del model
    torch.cuda.empty_cache()

In [6]:
create_synt_data()

README.md:   0%|          | 0.00/5.31k [00:00<?, ?B/s]

(…)-00000-of-00003-6eeb0e135bc51d39.parquet:   0%|          | 0.00/162M [00:00<?, ?B/s]

(…)-00001-of-00003-22b1737dee8de958.parquet:   0%|          | 0.00/162M [00:00<?, ?B/s]

(…)-00002-of-00003-26a59d985c38668c.parquet:   0%|          | 0.00/162M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/1468146 [00:00<?, ? examples/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

100%|██████████| 10/10 [12:37<00:00, 75.76s/it]


## Fine tune model

In [7]:
model = AutoModelForCausalLM.from_pretrained('ibm-granite/granite-3b-code-base-2k',
                                                 torch_dtype=torch.bfloat16)
tokenizer = AutoTokenizer.from_pretrained('ibm-granite/granite-3b-code-base-2k')

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [8]:
# Read dataset
def get_datasets(tokenizer, finetune_dataset_name):
    try:
        with open(finetune_dataset_name, 'r') as f:
            dataset = json.load(f)
    except FileNotFoundError as e:
        print(f"The file {e} was not found.")
        return None, None

    train_data = dataset['train']
    train_dataset = Dataset.from_list(train_data)

    train_test_split = train_dataset.train_test_split(test_size=0.2)

    train_dataset = train_test_split['train']
    test_dataset = train_test_split['test']

    def preprocess_function(examples):
        inputs = tokenizer(examples['prompt'],
                           padding='max_length',
                           truncation=True,
                           max_length=512,
                           return_tensors="pt")

        targets = tokenizer(examples['solution'],
                            padding='max_length',
                            truncation=True,
                            max_length=512,
                            return_tensors="pt")

        inputs['labels'] = targets['input_ids']

        return inputs

    tokenized_train_dataset = train_dataset.map(preprocess_function, batched=True)
    tokenized_test_dataset = test_dataset.map(preprocess_function, batched=True)

    tokenized_train_dataset = tokenized_train_dataset.remove_columns(['prompt', 'solution'])
    tokenized_test_dataset = tokenized_test_dataset.remove_columns(['prompt', 'solution'])

    return tokenized_train_dataset, tokenized_test_dataset


# Fine tuning model with save in the end
def finetune_model(model_name='ibm-granite/granite-3b-code-base-2k',
                   finetune_dataset_name='test_dataset.json'):
    model = AutoModelForCausalLM.from_pretrained(model_name,
                                                 torch_dtype=torch.bfloat16)
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    # freezing model layers
    for param in model.parameters():
        param.requires_grad = False

        if param.ndim == 1:
            param.data = param.data.to(torch.bfloat16)

    model.gradient_checkpointing_enable()
    model.enable_input_require_grads()

    class CastOutputToFloat(nn.Sequential):
        def forward(self, x): return super().forward(x).to(torch.bfloat16)

    model.lm_head = CastOutputToFloat(model.lm_head)

    # Set up layers for finetuning
    config = LoraConfig(
        r=16,
        lora_alpha=16,
        target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
        lora_dropout=0.05,
    )

    model = get_peft_model(model, config)
    model.print_trainable_parameters()

    tokenized_train_dataset, tokenized_test_dataset = get_datasets(tokenizer, finetune_dataset_name)

    if tokenized_train_dataset is None and tokenized_test_dataset is None:
        return None

    training_args = TrainingArguments(
        output_dir="./results",
        per_device_train_batch_size=8,
        per_device_eval_batch_size=8,
        num_train_epochs=30,
        logging_dir="./logs",
        logging_steps=10,
        evaluation_strategy="epoch",
        learning_rate=1e-4,
        remove_unused_columns=False,
        fp16=True,
    )

    # Define the Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_train_dataset,
        eval_dataset=tokenized_test_dataset,
        data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False)
    )

    print("Starting fine-tuning...")
    trainer.train()

    print("Evaluating the model...")
    evaluation_results = trainer.evaluate()

    print("Evaluation results:", evaluation_results)

    model.save_pretrained("./finetuned_model")
    tokenizer.save_pretrained("./finetuned_model")

    print("Model fine-tuned and saved successfully.")

    del model
    torch.cuda.empty_cache()

In [9]:
finetune_model()

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

trainable params: 10,485,760 || all params: 3,492,989,440 || trainable%: 0.3002


Map:   0%|          | 0/8 [00:00<?, ? examples/s]

Map:   0%|          | 0/2 [00:00<?, ? examples/s]

  self.scaler = torch.cuda.amp.GradScaler(**kwargs)


Starting fine-tuning...


`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`.
  with torch.enable_grad(), device_autocast_ctx, torch.cpu.amp.autocast(**ctx.cpu_autocast_kwargs):  # type: ignore[attr-defined]


Epoch,Training Loss,Validation Loss
1,No log,No log


We detected that you are passing `past_key_values` as a tuple and this is deprecated and will be removed in v4.43. Please use an appropriate `Cache` class (https://huggingface.co/docs/transformers/v4.41.3/en/internal/generation_utils#transformers.Cache)


Evaluating the model...


Evaluation results: {'eval_runtime': 0.3359, 'eval_samples_per_second': 5.954, 'eval_steps_per_second': 2.977, 'epoch': 1.0}
Model fine-tuned and saved successfully.


## Test finetunde model

In [10]:
generate_model_answers(model_name='./finetuned_model')

del model
torch.cuda.empty_cache()

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]



KeyboardInterrupt: 