In [1]:
!pip install datasets transformers sentencepiece



In [3]:
import os

KAGGLE_KERNEL_INTERACTIVE = os.environ["KAGGLE_KERNEL_RUN_TYPE"] == "Interactive" and False

if KAGGLE_KERNEL_INTERACTIVE:
    !pip install huggingface_hub
    !apt install git-lfs
    !git lfs install

    from huggingface_hub import notebook_login

    notebook_login()

In [6]:
import os
import json
import torch

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from transformers import RobertaTokenizerFast, T5ForConditionalGeneration, Trainer, TrainingArguments, DataCollatorForSeq2Seq
from datasets import load_dataset

from tqdm.notebook import tqdm
from IPython.display import HTML
from functools import partial
from difflib import SequenceMatcher

pd.set_option('max_columns', None)

codenet_root = '/kaggle/input/codenetpy-python-final/codenetpy/'

os.environ["WANDB_DISABLED"] = "true"
os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [7]:
dataset = load_dataset("json", data_files={"train": codenet_root+"codenetpy_train.json", "test": codenet_root+"codenetpy_test.json"}, field='data')

Downloading and preparing dataset json/default to /root/.cache/huggingface/datasets/json/default-36680e98994034c0/0.0.0/ac0ca5f5289a6cf108e706efcf040422dbbfa8e658dee6a819f20d76bb84d26b...


  0%|          | 0/2 [00:00<?, ?it/s]

  0%|          | 0/2 [00:00<?, ?it/s]

Dataset json downloaded and prepared to /root/.cache/huggingface/datasets/json/default-36680e98994034c0/0.0.0/ac0ca5f5289a6cf108e706efcf040422dbbfa8e658dee6a819f20d76bb84d26b. Subsequent calls will reuse this data.


  0%|          | 0/2 [00:00<?, ?it/s]

In [8]:
train_dataset = dataset["train"].filter(lambda example: example["returncode"] != 0).train_test_split(test_size=0.1)
test_dataset = dataset["test"]

  0%|          | 0/39 [00:00<?, ?ba/s]

In [9]:
max_source_length = 256
max_target_length = 512

def tokenize_and_align_labels(tokenizer, example):    
    tokenized_inputs = tokenizer(text=example["error_class_extra"], text_pair=example["original_src"], max_length=max_source_length, padding=True, truncation=True)
    tokenized_y = tokenizer(example["changed_src"], max_length=max_target_length, padding=True, truncation=True)
    
    labels = tokenized_y.input_ids
    labels = torch.tensor(labels)
    labels[labels == tokenizer.pad_token_id] = -100

    tokenized_inputs["labels"] = labels.tolist()
    return tokenized_inputs

tokenizer = RobertaTokenizerFast.from_pretrained("Salesforce/codet5-base")

train_dataset = train_dataset.map(partial(tokenize_and_align_labels, tokenizer), batched=True, batch_size=4, remove_columns=train_dataset["train"].column_names)
test_dataset = test_dataset.map(partial(tokenize_and_align_labels, tokenizer), batched=True, batch_size=4, remove_columns=test_dataset.column_names)

Downloading:   0%|          | 0.00/1.44k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/687k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/287k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/12.2k [00:00<?, ?B/s]

  0%|          | 0/8687 [00:00<?, ?ba/s]

  0%|          | 0/966 [00:00<?, ?ba/s]

  0%|          | 0/1688 [00:00<?, ?ba/s]

In [10]:
training_args = TrainingArguments(
    output_dir='codet5-base-buggy-code-repair',                # output directory
    num_train_epochs=10,                                       # total number of training epochs
    per_device_train_batch_size=4,                             # batch size per device during training
    per_device_eval_batch_size=4,                              # batch size for evaluation
    warmup_steps=500,                                          # number of warmup steps for learning rate scheduler
    weight_decay=0.01,                                         # strength of weight decay
    logging_dir='./logs',                                      # directory for storing logs
    logging_steps=1_000,                                       # Steps to report the loss value
    save_strategy ="no",
    push_to_hub=KAGGLE_KERNEL_INTERACTIVE,
)

model = T5ForConditionalGeneration.from_pretrained("Salesforce/codet5-base")
data_collator = DataCollatorForSeq2Seq(tokenizer, model, padding=True)

Using the `WAND_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).


Downloading:   0%|          | 0.00/1.53k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/850M [00:00<?, ?B/s]

In [11]:
def compute_metrics(p):
    predictions, labels = p
    
    predictions = np.argmax(predictions[0], axis=2)
   
    true_predictions = [p for pred in predictions for p in pred]
    true_labels = [p for pred in labels for p in pred]
    
    return {
        "precision": precision_score(true_labels, true_predictions, average="weighted"),
        "recall": recall_score(true_labels, true_predictions, average="weighted"),
        "f1": f1_score(true_labels, true_predictions, average="weighted"),
        "accuracy": accuracy_score(true_labels, true_predictions)
    }

In [None]:
trainer = Trainer(
    model=model,                         
    args=training_args,                  
    train_dataset=train_dataset["train"],  
    eval_dataset=train_dataset["test"].select(range(10)),    
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    data_collator=data_collator,
)

trainer.train()

***** Running training *****
  Num examples = 34745
  Num Epochs = 10
  Instantaneous batch size per device = 4
  Total train batch size (w. parallel, distributed & accumulation) = 4
  Gradient Accumulation steps = 1
  Total optimization steps = 86870


Step,Training Loss
1000,0.982
2000,0.5434
3000,0.5041
4000,0.4703
5000,0.4641
6000,0.4374
7000,0.447
8000,0.4303
9000,0.4096
10000,0.3798


In [None]:
trainer.evaluate()

In [None]:
def predict(tokenizer, model, error, source, beam_size=5):
    tokenized_inputs = tokenizer(text=error, text_pair=source, max_length=512, padding=True, truncation=True, return_tensors="pt").to(model.device)
    tokenized_labels = model.generate(num_beams=beam_size, no_repeat_ngram_size=2, num_return_sequences=beam_size, max_length=512, **tokenized_inputs).cpu().detach().numpy()

    return tokenizer.batch_decode(tokenized_labels, skip_special_tokens=True)

def compute_accuracy(beam_sizes, total=1000, dataset=dataset["test"]):
    correct_preds = [0 for _ in beam_sizes]
    
    for i in tqdm(range(total)):
        preds = predict(tokenizer, model, [dataset[i]["error_class_extra"]], [dataset[i]["original_src"]], beam_size=beam_sizes[-1])

        for j, pred in enumerate(preds):
            if pred == dataset[i]["changed_src"]:
                for k in range(len(correct_preds)):
                    if j < beam_sizes[k]:
                        correct_preds[k] += 1
                break
                
    for b, c in zip(beam_sizes, correct_preds):
        print(f"Accuracy beam_size={b} {c / total}")

compute_accuracy(beam_sizes=[1, 5, 10, 50])

In [None]:
if KAGGLE_KERNEL_INTERACTIVE:
    trainer.push_to_hub()

In [None]:
def generate_char_mask(original_src, changed_src):
    s = SequenceMatcher(None, original_src, changed_src)
    opcodes = [x for x in s.get_opcodes() if x[0] != "equal"]
    
    original_labels = np.zeros_like(list(original_src), dtype=np.int32)
    for _, i1, i2, _, _ in opcodes:
        original_labels[i1: max(i1+1, i2)] = 1

    return original_labels.tolist()

def color_source(source_code, mask, color='red'):
    text = ""
    for i, char in enumerate(source_code):
        norm_color = 'black'
        if char == ' ':
            char = "•"
            norm_color = 'lightgrey'
        if char == '\n':
            char = "↵\n"
            norm_color = 'lightgrey'
        text += f'<span style="color:{color if mask[i] == 1 else norm_color};">{char}</span>'
    return "<pre>" + text + "</pre>"

In [None]:
for i in range(50):
    display(HTML(f"<h2>Example {i}</h2>"))
    
    display(HTML(f"<h3>Source Code</h3>"))
    mask = generate_char_mask(dataset["test"][i]["original_src"], dataset["test"][i]["changed_src"])
    display(HTML(color_source(dataset["test"][i]["original_src"], mask)))
    
    display(HTML(f"<h3>Accepted Source Code</h3>"))
    print(dataset["test"][i]["changed_src"])
    
    display(HTML(f"<h3>Error Description</h3>"))
    print(dataset["test"][i]["error_class_extra"])
    
    display(HTML(f"<h3>Prediction Source Code</h3>"))
    preds = predict(tokenizer, model, [dataset["test"][i]["error_class_extra"]], [dataset["test"][i]["original_src"]], beam_size=5)
    for pred in preds:
        print()
        print(pred)