In [None]:
!pip install transformers
!pip install -q transformers datasets
!pip install -q pytorch-lightning wandb
import transformers

In [None]:
from datasets import load_dataset

dataset = load_dataset("code_search_net", "python")
print(dataset)

In [None]:
example = dataset['train'][0]

print("Code:", example["whole_func_string"])

In [None]:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("SEBIS/code_trans_t5_base_code_documentation_generation_python_transfer_learning_finetune")

prefix = "Summarize Python: "
max_input_length = 256
max_target_length = 128

In [None]:


def preprocess_examples(examples):
  # encode the code-docstring pairs
    codes = examples['whole_func_string']
    docstrings = examples['func_documentation_string']
  
    inputs = [prefix + code for code in codes]
    model_inputs = tokenizer(inputs, max_length=max_input_length, padding="max_length", truncation=True)

  # encode the summaries
    labels = tokenizer(docstrings, max_length=max_target_length, padding="max_length", truncation=True).input_ids


    labels_with_ignore_index = []
    for labels_example in labels:
        labels_example = [label if label != 0 else -100 for label in labels_example]
        labels_with_ignore_index.append(labels_example)
  
    model_inputs["labels"] = labels_with_ignore_index

    return model_inputs

In [None]:
dataset = dataset.map(preprocess_examples, batched=True)

In [None]:
# dataset['train'][1]['input_ids']

In [None]:
from torch.utils.data import DataLoader

dataset.set_format(type="torch", columns=['input_ids', 'attention_mask', 'labels'])

In [None]:
train_dataloader = DataLoader(dataset['train'], shuffle=True, batch_size=8)
valid_dataloader = DataLoader(dataset['validation'], batch_size=4)
test_dataloader = DataLoader(dataset['test'], batch_size=4)

In [None]:
batch = next(iter(train_dataloader))
print(batch.keys())

Let's verify an example, by decoding it back into text:

In [None]:
tokenizer.decode(batch['input_ids'][0])

In [None]:
labels = batch['labels'][0]
tokenizer.decode([label for label in labels if label != -100])

## Fine-tune using PyTorch Lightning



In [None]:
from transformers import  AutoModelWithLMHead, AdamW, get_linear_schedule_with_warmup
import pytorch_lightning as pl

class CodeTrans(pl.LightningModule):
    def __init__(self, lr=5e-5, num_train_epochs=1, warmup_steps=1000):
        super().__init__()
        self.model = AutoModelWithLMHead.from_pretrained("SEBIS/code_trans_t5_base_code_documentation_generation_python_transfer_learning_finetune")
        self.save_hyperparameters()

    def forward(self, input_ids, attention_mask, labels=None):     
        outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        return outputs
    
    def common_step(self, batch, batch_idx):
        outputs = self(**batch)
        loss = outputs.loss

        return loss
      
    def training_step(self, batch, batch_idx):
        loss = self.common_step(batch, batch_idx)     
        # logs metrics for each training_step,
        # and the average across the epoch
        self.log("training_loss", loss)

        return loss

    def validation_step(self, batch, batch_idx):
        loss = self.common_step(batch, batch_idx)     
        self.log("validation_loss", loss, on_epoch=True)

        return loss

    def test_step(self, batch, batch_idx):
        loss = self.common_step(batch, batch_idx)     

        return loss

    def configure_optimizers(self):
        # create optimizer
        optimizer = AdamW(self.parameters(), lr=self.hparams.lr)
        # create learning rate scheduler
        num_train_optimization_steps = self.hparams.num_train_epochs * len(train_dataloader)
        lr_scheduler = {'scheduler': get_linear_schedule_with_warmup(optimizer,
                                                    num_warmup_steps=self.hparams.warmup_steps,
                                                    num_training_steps=num_train_optimization_steps),
                        'name': 'learning_rate',
                        'interval':'step',
                        'frequency': 1}
        
        return {"optimizer": optimizer, "lr_scheduler": lr_scheduler}

    def train_dataloader(self):
        return train_dataloader

    def val_dataloader(self):
        return valid_dataloader

    def test_dataloader(self):
        return test_dataloader

Let's start up Weights and Biases!

In [None]:
import wandb

wandb.login()

Next, we initialize the model.

In [None]:
model = CodeTrans()

We can now simply start training on Colab's GPU.

In [None]:
from pytorch_lightning import Trainer
from pytorch_lightning.loggers import WandbLogger
from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor

wandb_logger = WandbLogger(name='codetrans-finetune', project='CodeTrans')

early_stop_callback = EarlyStopping(
    monitor='validation_loss',
    patience=3,
    strict=False,
    verbose=False,
    mode='min'
)
lr_monitor = LearningRateMonitor(logging_interval='step')

trainer = Trainer(gpus=1, 
                  default_root_dir=None, 
                  logger=wandb_logger, 
                  callbacks=[early_stop_callback, lr_monitor])
trainer.fit(model)

## Save the model 

In [None]:

model.save_pretrained(save_directory="CodeTrans_CSN")


This allows us to easily load the trained model again using the `from_pretrained()` method, as shown below.

## Inference

Now that we've trained a model, let's test it on some examples from the test set.

In [None]:
from datasets import load_dataset

dataset = load_dataset("code_search_net", "python")
print(dataset['test'])

In [None]:
test_example = dataset['test'][2]
print("Code:", test_example['whole_func_string'])


We can load our trained model as follows:

In [None]:
from transformers import T5ForConditionalGeneration

# model = T5ForConditionalGeneration.from_pretrained("save_directory")
model = T5ForConditionalGeneration.from_pretrained("CT5_CSN")

In [None]:
# prepare for the model
input_ids = tokenizer(test_example['whole_func_string'], return_tensors='pt').input_ids
# input_ids = tokenizer(test_example, return_tensors='pt').input_ids
# generate
outputs = model.generate(input_ids)
print("Generated Docstring:", tokenizer.decode(outputs[0], skip_special_tokens=True))

Let's compare this to the ground-truth docstring:

In [None]:
print("Actual Docstring:", test_example['func_documentation_string'])