In [None]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import json
import torch
import torch.nn as nn
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments
from transformers import DataCollatorForSeq2Seq
from transformers import LEDTokenizer, LEDForConditionalGeneration
from transformers import AutoTokenizer, BigBirdPegasusForConditionalGeneration

In [None]:
!pip install accelerate

In [None]:
model_name = "Salesforce/codet5-base"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)

# model_name = "allenai/led-large-16384"

# tokenizer = LEDTokenizer.from_pretrained(model_name)
# model = LEDForConditionalGeneration.from_pretrained(model_name)

# model_name = "google/bigbird-pegasus-large-arxiv"
# tokenizer = AutoTokenizer.from_pretrained(model_name)
# model = BigBirdPegasusForConditionalGeneration.from_pretrained(model_name)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

## Function Level Summarization

In [None]:
class LoadData(Dataset):
    def __init__(self, file_path, tokenizer, max_length = 512):
        self.data = self.load_jsonl(file_path)
        self.tokenizer = tokenizer
        self.max_length = max_length

    def load_jsonl(self, file_path):
        with open(file_path, 'r', encoding = 'utf-8') as f:
            return [json.loads(line) for line in f]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data[idx]
        code = row['method_code']
        summary = row['method_summary']

        inputs = self.tokenizer(code, max_length = self.max_length, padding = 'max_length', truncation = True, return_tensors = 'pt')
        labels = self.tokenizer(summary, padding = 'max_length', truncation = True, return_tensors = 'pt')

        return {
            "input_ids": inputs['input_ids'].squeeze(0),
            "attention_mask": inputs['attention_mask'].squeeze(0),
            "labels": labels['input_ids'].squeeze(0)
        }

    ### For Longformer
    # def __getitem__(self, idx):
    #     row = self.data[idx]
    #     code = row['_code']
    #     summary = row['repo_summary']

    #     # Important for LED: set global attention on the first token
    #     inputs = self.tokenizer(code, max_length=self.max_length, padding='max_length', truncation=True, return_tensors='pt')
    #     input_ids = inputs['input_ids'].squeeze(0)
    #     attention_mask = inputs['attention_mask'].squeeze(0)

    #     global_attention_mask = torch.zeros_like(attention_mask)
    #     global_attention_mask[0] = 1  # global attention on the first token

    #     labels = self.tokenizer(summary, max_length=512, padding='max_length', truncation=True, return_tensors='pt')['input_ids'].squeeze(0)

    #     return {
    #         "input_ids": input_ids,
    #         "attention_mask": attention_mask,
    #         "global_attention_mask": global_attention_mask,
    #         "labels": labels
    #     }


In [None]:
train_data = '/kaggle/input/code-summarizer/method-level-mcsn.jsonl'
val_data = '/kaggle/input/code-summarizer/method-level-mcsn-few-shot.jsonl'

train_dataset = LoadData(train_data, tokenizer)
val_dataset = LoadData(val_data, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=2)

In [None]:
# I have wandb enabled. Added this code to disable that.
import os
os.environ["WANDB_DISABLED"] = "true"

In [None]:
!pip install evaluate
!pip install rouge_score
!pip install bleu

In [None]:
# import os
# os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

In [None]:
training_args = Seq2SeqTrainingArguments(
    output_dir="./codet5-finetuned",
    evaluation_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    num_train_epochs=3,
    weight_decay=0.01,
    save_strategy="epoch",
    logging_dir="./logs",
    predict_with_generate=True,
    generation_max_length=128,
    fp16=True,  # Enable if you're using GPU with mixed precision
    report_to="none",
    dataloader_num_workers=2,  # Optimize data loading
    ddp_find_unused_parameters=False,
    logging_strategy="steps",
    logging_steps=10,
    save_total_limit=2,
    save_steps=500,
)


# Custom data collator to handle global attention mask in longformer
# def custom_data_collator(batch):
#     input_ids = torch.stack([item["input_ids"] for item in batch])
#     attention_mask = torch.stack([item["attention_mask"] for item in batch])
#     global_attention_mask = torch.stack([item["global_attention_mask"] for item in batch])
#     labels = torch.tensor(np.array([item["labels"] for item in batch]), dtype=torch.int64)
#     return {
#         "input_ids": input_ids,
#         "attention_mask": attention_mask,
#         "global_attention_mask": global_attention_mask,
#         "labels": labels,
#     }


data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
)


In [None]:
trainer.train()

In [None]:
trainer.evaluate()

In [None]:
torch.cuda.empty_cache()

In [None]:
# def generate_summary(batch, model, tokenizer, max_length=512):

#     input_ids = batch["input_ids"].to(device)
#     attention_mask = batch["attention_mask"].to(device)

#     with torch.no_grad():
#         summary_ids = model.generate(
#             input_ids=input_ids, attention_mask=attention_mask,
#             max_length=max_length, num_beams=5, early_stopping=True
#         )

#     summaries = tokenizer.batch_decode(summary_ids, skip_special_tokens=True)
#     return summaries

# # Run inference on a small batch
# batch = next(iter(val_loader))
# generated_summaries = generate_summary(batch, model, tokenizer)

In [None]:
def generate_summary(batch, model, tokenizer, max_length=128):
    input_ids = batch["input_ids"].to(device)
    attention_mask = batch["attention_mask"].to(device)

    with torch.no_grad():
        summary_ids = model.generate(
            input_ids=input_ids,
            attention_mask=attention_mask,
            max_length=max_length,
            num_beams=2,
            early_stopping=True
        )

    summaries = tokenizer.batch_decode(summary_ids, skip_special_tokens=True)
    return summaries

In [None]:
batch = next(iter(val_loader))  # keep batch size = 1 for now
generated_summaries = generate_summary(batch, model, tokenizer)

print(generated_summaries)

In [None]:
import pandas as pd

results_df = pd.DataFrame({
    "Method Code": [val_dataset.data[i]["method_code"] for i in range(len(generated_summaries))],
    "Original Summary": [val_dataset.data[i]["method_summary"] for i in range(len(generated_summaries))],
    "Generated Summary": generated_summaries,
})

results_df.head()

In [None]:
results_df['Original Summary'][1]

In [None]:
results_df['Generated Summary'][1]

In [None]:
!pip install rouge

In [None]:
from rouge import Rouge
rouge = Rouge()
rouge.get_scores(results_df['Generated Summary'], results_df['Original Summary'],avg=True)

In [None]:
!pip install evaluate bert-score

In [None]:
from bert_score import BERTScorer

# Clean strings
results_df['Generated Summary'] = results_df['Generated Summary'].astype(str).str.replace('\n', ' ').str.strip()
results_df['Original Summary'] = results_df['Original Summary'].astype(str).str.replace('\n', ' ').str.strip()

# Init scorer
scorer = BERTScorer(model_type='bert-base-uncased', lang="en")

# Run scoring
P, R, F1 = scorer.score(
    list(results_df['Generated Summary']),
    list(results_df['Original Summary'])
)

print(f"BERTScore Precision: {P.mean():.4f}, Recall: {R.mean():.4f}, F1: {F1.mean():.4f}")

## File Level Summarization

In [None]:
from collections import defaultdict

def group_by_file(data):
    file_dict = defaultdict(lambda: {"file_code": "", "file_summary": ""})
    for item in data:
        file_name = item["file_name"]
        method_code = item["method_code"]
        method_summary = item["method_summary"]

        file_dict[file_name]["file_code"] += method_code + "\n"
        file_dict[file_name]["file_summary"] += method_summary + " "

    return [{"file_name": k, "file_code": v["file_code"], "file_summary": v["file_summary"].strip()}
            for k, v in file_dict.items()]

In [None]:
class FileLevelDataset(Dataset):
    def __init__(self, data, tokenizer, max_length=512):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        code = item["file_code"]
        summary = item["file_summary"]

        inputs = self.tokenizer(code, max_length=self.max_length, padding='max_length', truncation=True, return_tensors='pt')
        labels = self.tokenizer(summary, max_length=self.max_length, padding='max_length', truncation=True, return_tensors='pt')

        return {
            "input_ids": inputs['input_ids'].squeeze(0),
            "attention_mask": inputs['attention_mask'].squeeze(0),
            "labels": labels['input_ids'].squeeze(0)
        }

In [None]:
file_level_train_data = group_by_file(train_dataset.data)
file_level_val_data = group_by_file(val_dataset.data)

file_train_dataset = FileLevelDataset(file_level_train_data, tokenizer)
file_val_dataset = FileLevelDataset(file_level_val_data, tokenizer)

file_train_loader = DataLoader(file_train_dataset, batch_size=2, shuffle=True)
file_val_loader = DataLoader(file_val_dataset, batch_size=2)

In [None]:
# model_name = "allenai/led-large-16384"

# tokenizer = LEDTokenizer.from_pretrained(model_name)
# model = LEDForConditionalGeneration.from_pretrained(model_name)


# model_name = "google/bigbird-pegasus-large-arxiv"
# tokenizer = AutoTokenizer.from_pretrained(model_name)
# model = BigBirdPegasusForConditionalGeneration.from_pretrained(model_name)
# model.gradient_checkpointing_enable()

In [None]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = model.to(device)

In [None]:
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=file_train_dataset,
    eval_dataset=file_val_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
)

trainer.train()

In [None]:
file_batch = next(iter(file_val_loader))

generated_file_summaries = generate_summary(file_batch, model, tokenizer, max_length=128)

In [None]:
import pandas as pd

In [None]:
file_results_df = pd.DataFrame({
    "File Name": [file_level_val_data[i]["file_name"] for i in range(len(generated_file_summaries))],
    "File Code": [file_level_val_data[i]["file_code"] for i in range(len(generated_file_summaries))],
    "Original File Summary": [file_level_val_data[i]["file_summary"] for i in range(len(generated_file_summaries))],
    "Generated File Summary": generated_file_summaries,
})

file_results_df.head()

In [None]:
file_results_df['Original File Summary'][1]

In [None]:
file_results_df['Generated File Summary'][1]

In [None]:
from rouge import Rouge
rouge = Rouge()
rouge.get_scores(file_results_df['Generated File Summary'], file_results_df['Original File Summary'],avg=True)

In [None]:
from bert_score import BERTScorer

In [None]:
# Clean strings
file_results_df['Generated File Summary'] = file_results_df['Generated File Summary'].astype(str).str.replace('\n', ' ').str.strip()
file_results_df['Original File Summary'] = file_results_df['Original File Summary'].astype(str).str.replace('\n', ' ').str.strip()

# Init scorer
scorer = BERTScorer(model_type='bert-base-uncased', lang="en")

# Run scoring
P, R, F1 = scorer.score(
    list(file_results_df['Generated File Summary']),
    list(file_results_df['Original File Summary'])
)

print(f"BERTScore Precision: {P.mean():.4f}, Recall: {R.mean():.4f}, F1: {F1.mean():.4f}")