In [1]:
# Hugging Face - Fine-Tuning CodeT5 for Code Translation (AI4SE Focus)

# This notebook demonstrates how to fine-tune the CodeT5 model using Hugging Face Transformers
# for a Software Engineering task: predicting masked if statements

# ------------------------
# 1. Install Required Libraries
# ------------------------
!pip install torch==2.5.1 torchvision==0.20.1 torchaudio==2.5.1 --index-url https://pypi.org/simple
!pip install transformers datasets evaluate -q
!pip install datasets
!pip install evaluate
!pip install sacrebleu
!pip install codebleu
!pip install numpy

# Use a pipeline as a high-level helper
from transformers import pipeline

pipe = pipeline("text2text-generation", model="Salesforce/codet5-small")

Collecting torch==2.5.1
  Downloading torch-2.5.1-cp311-cp311-manylinux1_x86_64.whl.metadata (28 kB)
Collecting torchvision==0.20.1
  Downloading torchvision-0.20.1-cp311-cp311-manylinux1_x86_64.whl.metadata (6.1 kB)
Collecting torchaudio==2.5.1
  Downloading torchaudio-2.5.1-cp311-cp311-manylinux1_x86_64.whl.metadata (6.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch==2.5.1)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch==2.5.1)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch==2.5.1)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch==2.5.1)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.57k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/242M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/1.48k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/242M [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/703k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/294k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/12.5k [00:00<?, ?B/s]

Device set to use cuda:0


In [None]:
# ------------------------
# 2. Mine methods through GitHub metadata
# Can be skipped if you have python_methods.csv
# ------------------------

import pandas as pd
import requests
import ast
import csv
import os
from typing import List, Tuple
import time

from transformers import TrainingArguments, Trainer, EarlyStoppingCallback, DataCollatorForSeq2Seq

MAX_METHODS = 50000  # 500 For testing
MAX_FILE_SIZE = 100000  # ~100KB
MAX_DEPTH = 2  # Limit recursion depth
MAX_RATE_LIMIT_RETRIES = 3  # Cap retries for 403 errors

def extract_methods_from_python(code: str) -> List[Tuple[str, str, str]]:
    """Extract Python methods with if statements using the ast module."""
    methods = []
    try:
        tree = ast.parse(code)
        for node in ast.walk(tree):
            if isinstance(node, ast.FunctionDef):
                method_code = ast.unparse(node).strip()
                cleaned_method = "\n".join(line for line in method_code.split("\n") if line.strip())
                for child in ast.walk(node):
                    if isinstance(child, ast.If):
                        if_condition = ast.unparse(child.test).strip()
                        tokens = " ".join(method_code.split())
                        methods.append((cleaned_method, if_condition, tokens))
                        break
    except (SyntaxError, ValueError):
        pass
    return methods

def fetch_repo_contents(repo_name: str, branch: str, token: str, path: str = "", depth: int = 0, retries: int = 0) -> List[dict]:
    """Fetch repository contents recursively using GitHub API with depth and retry limits."""
    if depth > MAX_DEPTH:
        print(f"Depth limit reached for {repo_name}/{path}")
        return []
    headers = {"Authorization": f"token {token}"}
    api_url = f"https://api.github.com/repos/{repo_name}/contents/{path}"
    params = {"ref": branch}
    print(f"Attempting to fetch {api_url} (depth {depth}, retry {retries})")
    try:
        response = requests.get(api_url, headers=headers, params=params)
        print(f"Fetching contents {api_url}: Status {response.status_code}")
        if response.status_code == 200:
            return response.json()
        elif response.status_code == 403 and retries < MAX_RATE_LIMIT_RETRIES:
            print(f"Rate limit exceeded for {repo_name}/{path}. Waiting 60s... (retry {retries + 1}/{MAX_RATE_LIMIT_RETRIES})")
            time.sleep(60)
            return fetch_repo_contents(repo_name, branch, token, path, depth, retries + 1)
        else:
            print(f"Failed to fetch {repo_name}/{path}: Status {response.status_code}, Response: {response.text[:100]}...")
            return []
    except Exception as e:
        print(f"Error fetching {repo_name}/{path}: {e}")
        return []

def download_file(file_url: str, token: str) -> str:
    """Download a file's content from GitHub raw URL."""
    headers = {"Authorization": f"token {token}", "Accept": "application/vnd.github.v3.raw"}
    print(f"Attempting to fetch {file_url}")
    try:
        response = requests.get(file_url, headers=headers)
        print(f"Fetching {file_url}: Status {response.status_code}")
        if response.status_code == 200:
            content = response.text
            if len(content.encode("utf-8")) > MAX_FILE_SIZE:
                print(f"Skipping large file: {file_url} (size: {len(content.encode('utf-8'))} bytes)")
                return None
            return content
        elif response.status_code == 403:
            print(f"Rate limit hit for {file_url}. Waiting 60s...")
            time.sleep(60)
            response = requests.get(file_url, headers=headers)
            if response.status_code == 200:
                return response.text
            print(f"Retry failed for {file_url}: Status {response.status_code}")
        else:
            print(f"Failed to download {file_url}: Status {response.status_code}, Response: {response.text[:100]}...")
        return None
    except Exception as e:
        print(f"Exception downloading {file_url}: {e}")
        return None

def extract_methods_to_csv(repo_data: pd.Series, output_csv: str, method_count: int, token: str) -> int:
    """Extract Python methods with if statements from a repository and append to CSV."""
    os.makedirs(os.path.dirname(output_csv), exist_ok=True)

    with open(output_csv, mode='a', newline='', encoding='utf-8') as csvfile:
        csv_writer = csv.writer(csvfile)
        if os.path.getsize(output_csv) == 0:
            csv_writer.writerow(["cleaned_method", "target_block", "tokens_in_method"])

        repo_name = repo_data['name']
        branch = repo_data['defaultBranch']
        print(f"Processing repository: {repo_name}")

        contents = fetch_repo_contents(repo_name, branch, token)
        if not contents:
            print(f"No contents retrieved for {repo_name}. Skipping.")
            return method_count

        python_files = []

        def collect_python_files(items, depth=0):
            if not isinstance(items, list):
                print(f"No valid contents for {repo_name} at depth {depth}")
                return
            print(f"Collecting files at depth {depth} with {len(items)} items")
            for i, item in enumerate(items):
                print(f"Processing item {i+1}/{len(items)} at depth {depth}: {item['path']}")
                if item["type"] == "file" and item["name"].endswith(".py"):
                    python_files.append(item)
                elif item["type"] == "dir" and depth < MAX_DEPTH:
                    sub_contents = fetch_repo_contents(repo_name, branch, token, item["path"], depth + 1)
                    collect_python_files(sub_contents, depth + 1)

        collect_python_files(contents)
        print(f"Found {len(python_files)} Python files in {repo_name}")

        for i, file in enumerate(python_files):
            print(f"Processing file {i+1}/{len(python_files)}: {file['name']}")
            content = download_file(file["download_url"], token)
            if content:
                methods = extract_methods_from_python(content)
                print(f"Extracted {len(methods)} methods from {file['name']}")
                for cleaned_method, target_block, tokens in methods:
                    csv_writer.writerow([cleaned_method, target_block, tokens])
                    method_count += 1
                    print(f"Methods extracted: {method_count}")
                    if method_count >= MAX_METHODS:
                        print(f"Reached limit of {MAX_METHODS} methods")
                        return method_count
            else:
                print(f"Skipping file {file['name']} due to download failure")
            time.sleep(0.25)

        return method_count

def process_repos_to_dataset(input_csv: str, output_csv: str, token: str):
    """Process repositories from metadata and extract Python methods with if statements."""
    df = pd.read_csv(input_csv)
    python_repos = df[df['mainLanguage'] == 'Python']
    print(f"Found {len(python_repos)} Python repositories")

    with open(output_csv, 'w', newline='', encoding='utf-8') as f:
        pass

    total_methods = 0
    for i, repo in python_repos.iterrows():
        print(f"Starting repository {i+1}/{len(python_repos)}")
        total_methods = extract_methods_to_csv(repo, output_csv, total_methods, token)
        if total_methods >= MAX_METHODS:
            print(f"Finished: Extracted {total_methods} methods")
            break
        time.sleep(.5)

    print(f"Results saved to {output_csv}")

GITHUB_TOKEN = "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"
process_repos_to_dataset("/content/repos.csv", "/content/python_methods.csv", GITHUB_TOKEN)

try:
    from google.colab import files
    files.download("/content/python_methods.csv")
except ImportError:
    print("Not running in Google Colab — manual download required.")


Found 1066 Python repositories
Starting repository 1/1066
Processing repository: openhumans/quantified-flu
Attempting to fetch https://api.github.com/repos/openhumans/quantified-flu/contents/ (depth 0, retry 0)
Fetching contents https://api.github.com/repos/openhumans/quantified-flu/contents/: Status 200
Collecting files at depth 0 with 17 items
Processing item 1/17 at depth 0: .gitignore
Processing item 2/17 at depth 0: .pre-commit-config.yaml
Processing item 3/17 at depth 0: CONTRIBUTING.md
Processing item 4/17 at depth 0: LICENSE.txt
Processing item 5/17 at depth 0: Makefile
Processing item 6/17 at depth 0: Pipfile
Processing item 7/17 at depth 0: Pipfile.lock
Processing item 8/17 at depth 0: Procfile
Processing item 9/17 at depth 0: README.md
Processing item 10/17 at depth 0: checkin
Attempting to fetch https://api.github.com/repos/openhumans/quantified-flu/contents/checkin (depth 1, retry 0)
Fetching contents https://api.github.com/repos/openhumans/quantified-flu/contents/checkin:

KeyboardInterrupt: 

✅ This following loads a pre-trained models & tokenizer from Hugging Face using the checkpoint name (e.g., "Salesforce/codet5-small").


*  The tokenizer knows how to convert text into tokens that the model

*   It also handles things like padding, truncation, special tokens, etc.

*	It comes with a fixed vocabulary learned during pretraining, that however we can expand if needed as shown

In [2]:
# ------------------------
# 3. Split dataset into training, testing and evaluation datasets. 80-10-10 split.
# ------------------------

from datasets import DatasetDict, load_dataset
import pandas as pd

# Load the extracted dataset with full Colab path
dataset_df = pd.read_csv("/content/python_methods.csv")

# Clean the dataset: Remove rows where 'target_block' or 'tokens_in_method' is NaN
cleaned_df = dataset_df.dropna(subset=["target_block", "tokens_in_method"])
print(f"Original rows: {len(dataset_df)}, Rows after cleaning: {len(cleaned_df)}")
print(f"Removed {len(dataset_df) - len(cleaned_df)} rows with missing 'target_block' or 'tokens_in_method'")

# Function to mask if conditions and flatten, with error handling
def mask_if_condition(method, target):
    try:
        # Ensure inputs are strings (should be guaranteed after cleaning, but kept for robustness)
        if not isinstance(method, str) or not isinstance(target, str):
            return method  # Return unchanged if invalid
        # Replace the exact 'if {target}:' with 'if <mask>:', preserving rest of the code
        masked_method = method.replace(f"if {target}:", "if <mask>:")
        if masked_method == method:  # No replacement occurred
            print(f"Warning: Failed to mask 'if {target}:' in method:\n{method}")
        return " ".join(masked_method.split())  # Flatten with spaces
    except Exception as e:
        print(f"Error masking method: {e}")
        return method  # Return unchanged on error

# Apply masking to the cleaned dataset
cleaned_df["masked_method"] = cleaned_df.apply(
    lambda row: mask_if_condition(row["cleaned_method"], row["target_block"]), axis=1
)

# Split into train (80%), validation (10%), test (10%)
train_df = cleaned_df.sample(frac=0.8, random_state=42)
val_test_df = cleaned_df.drop(train_df.index)
val_df = val_test_df.sample(frac=0.5, random_state=42)
test_df = val_test_df.drop(val_df.index)

# Save splits with full Colab paths, keeping only necessary columns
train_df[["masked_method", "target_block", "tokens_in_method", "cleaned_method"]].to_csv("/content/train.csv", index=False)
val_df[["masked_method", "target_block", "tokens_in_method", "cleaned_method"]].to_csv("/content/val.csv", index=False)
test_df[["masked_method", "target_block", "tokens_in_method", "cleaned_method"]].to_csv("/content/test.csv", index=False)

# Load into DatasetDict
dataset = DatasetDict({
    "train": load_dataset("csv", data_files="/content/train.csv")["train"],
    "validation": load_dataset("csv", data_files="/content/val.csv")["train"],
    "test": load_dataset("csv", data_files="/content/test.csv")["train"]
})

# Verify with error handling
try:
    print("Sample Cleaned Method:", dataset["train"][0]["cleaned_method"])
    print("Sample Input (Masked):", dataset["train"][0]["masked_method"])
    print("Target If Condition:", dataset["train"][0]["target_block"])
    print("Train size:", len(dataset["train"]))
    print("Validation size:", len(dataset["validation"]))
    print("Test size:", len(dataset["test"]))
except KeyError as e:
    print(f"Verification error: Missing column {e}")
except IndexError as e:
    print(f"Verification error: Dataset is empty - {e}")

# Optional: Download files from Colab
#try:
#    from google.colab import files
#    files.download("/content/train.csv")
#    files.download("/content/val.csv")
#    files.download("/content/test.csv")
#except ImportError:
#    print("Not running in Colab — skipping download.")

Original rows: 50000, Rows after cleaning: 50000
Removed 0 rows with missing 'target_block' or 'tokens_in_method'
@pytest.mark.slow
def testTSDataset(self):
    tsdh = TSDatasetH(handler={'class': 'Alpha158', 'module_path': 'qlib.contrib.data.handler', 'kwargs': {'start_time': '2017-01-01', 'end_time': '2020-08-01', 'fit_start_time': '2017-01-01', 'fit_end_time': '2017-12-31', 'instruments': 'csi300', 'infer_processors': [{'class': 'FilterCol', 'kwargs': {'col_list': ['RESI5', 'WVMA5', 'RSQR5']}}, {'class': 'RobustZScoreNorm', 'kwargs': {'fields_group': 'feature', 'clip_outlier': 'true'}}, {'class': 'Fillna', 'kwargs': {'fields_group': 'feature'}}], 'learn_processors': ['DropnaLabel', {'class': 'CSRankNorm', 'kwargs': {'fields_group': 'label'}}]}}, segments={'train': ('2017-01-01', '2017-12-31'), 'valid': ('2018-01-01', '2018-12-31'), 'test': ('2019-01-01', '2020-08-01')})
    tsds_train = tsdh.prepare('train', data_key=DataHandlerLP.DK_L)
    tsds = tsdh.prepare('valid', data_key=Data

Generating train split: 0 examples [00:00, ? examples/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating train split: 0 examples [00:00, ? examples/s]

Sample Cleaned Method: @staticmethod
def validate(notification_type: NotificationType):
    if not get_active_providers_with_weights_by_notification_type(notification_type):
        raise Exception(f'Load Balancing Strategy cannot be used for {notification_type} notifications because there are no matching active providers that have load balancing weights')
Sample Input (Masked): @staticmethod def validate(notification_type: NotificationType): if <mask>: raise Exception(f'Load Balancing Strategy cannot be used for {notification_type} notifications because there are no matching active providers that have load balancing weights')
Target If Condition: not get_active_providers_with_weights_by_notification_type(notification_type)
Train size: 40000
Validation size: 5000
Test size: 5000


In [3]:
# ------------------------------------------------------------------------
# 4. Load Pre-trained Model & Tokenizer
# ------------------------------------------------------------------------
from transformers import T5ForConditionalGeneration, AutoModelForSeq2SeqLM
from transformers import RobertaTokenizer
from datasets import DatasetDict
from transformers import TrainingArguments, Trainer
from transformers import EarlyStoppingCallback

model_checkpoint = "Salesforce/codet5-small"

model = T5ForConditionalGeneration.from_pretrained(model_checkpoint)

tokenizer = RobertaTokenizer.from_pretrained(model_checkpoint)
tokenizer.add_tokens(["<IF-STMT>"]) #Imagine we need an extra token. This line adds the extra token to the vocabulary

model.resize_token_embeddings(len(tokenizer))




The new embeddings will be initialized from a multivariate normal distribution that has old embeddings' mean and covariance. As described in this article: https://nlp.stanford.edu/~johnhew/vocab-expansion.html. To disable this, use `mean_resizing=False`


Embedding(32101, 512)

⚠️⚠️⚠️ If you add new tokens like this, you must also resize the model’s embedding layer: model.resize_token_embeddings(len(tokenizer))

Otherwise, the model won’t know what to do with the new token IDs!


In [4]:
# ------------------------------------------------------------------------------------------------
# 5. We prepare now the fine-tuning dataset using the tokenizer we preloaded
# ------------------------------------------------------------------------------------------------

def preprocess_function(examples):
    inputs = tokenizer(examples["masked_method"], max_length=512, truncation=True, padding="max_length")
    targets = tokenizer(examples["target_block"], max_length=128, truncation=True, padding="max_length")
    inputs["labels"] = targets["input_ids"]
    return inputs

tokenized_datasets = dataset.map(preprocess_function, batched=True, remove_columns=["masked_method", "target_block"])

Map:   0%|          | 0/40000 [00:00<?, ? examples/s]

Map:   0%|          | 0/5000 [00:00<?, ? examples/s]

Map:   0%|          | 0/5000 [00:00<?, ? examples/s]

In [5]:
# ------------------------------------------------------------------------
# 6. Define Training Arguments and Trainer (25,000 steps total)
# ------------------------------------------------------------------------

from datasets import Dataset
from transformers import DataCollatorForSeq2Seq

def tokenize_function(examples):
    inputs = tokenizer(examples["masked_method"], truncation=True)
    labels = tokenizer(examples["target_block"], truncation=True)
    inputs["labels"] = labels["input_ids"]
    return inputs

# Original dataset tokenization
tokenized_datasets = dataset.map(tokenize_function, batched=True, remove_columns=dataset["train"].column_names)
data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)

val_dataset = tokenized_datasets["validation"]  # Use full validation set

train_dataset = tokenized_datasets["train"]

# Define training arguments
training_args = TrainingArguments(
    output_dir="./codet5-finetuned",
    num_train_epochs=10,
    max_steps=25000,
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="loss",
    save_total_limit=1,
    logging_steps=1000,
    push_to_hub=False,
    gradient_accumulation_steps=1,
    fp16=True,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,     # Use the validation set
    tokenizer=tokenizer,
    data_collator=data_collator,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)] #Increased patience for smaller epochs
)


Map:   0%|          | 0/40000 [00:00<?, ? examples/s]

Map:   0%|          | 0/5000 [00:00<?, ? examples/s]

Map:   0%|          | 0/5000 [00:00<?, ? examples/s]

  trainer = Trainer(


In [13]:
# ------------------------
# 7. Train the Model
# ------------------------
import wandb  # Import wandb

# Initialize wandb
wandb.init(project="my-codet5-project")


trainer.train()
print(f"Training completed on: {next(model.parameters()).device}")
trainer.save_model("./codet5-finetuned-final")
tokenizer.save_pretrained("./codet5-finetuned-final")

# import os


#try:
#    from google.colab import files
#
#   model_dir = "./codet5-finetuned-final"
#    for filename in os.listdir(model_dir):
#        filepath = os.path.join(model_dir, filename)
#        if os.path.isfile(filepath):
#            print(f"Downloading {filename}...")
#            files.download(filepath)
#
#except ImportError:
#    print("Not running in Colab — skipping download.")

Epoch,Training Loss,Validation Loss
1,1.0204,0.885897
2,0.8139,0.82042
3,0.7084,0.801714
4,0.6275,0.783749
5,0.5517,0.772822
6,0.5017,0.778108
7,0.4607,0.77032
8,0.4323,0.775489
9,0.3975,0.778492
10,0.3812,0.778104


There were missing keys in the checkpoint model loaded: ['encoder.embed_tokens.weight', 'decoder.embed_tokens.weight', 'lm_head.weight'].


Training completed on: cuda:0


('./codet5-finetuned-final/tokenizer_config.json',
 './codet5-finetuned-final/special_tokens_map.json',
 './codet5-finetuned-final/vocab.json',
 './codet5-finetuned-final/merges.txt',
 './codet5-finetuned-final/added_tokens.json')

In [28]:
import evaluate
import torch
import pandas as pd
from transformers import T5ForConditionalGeneration, RobertaTokenizer, DataCollatorForSeq2Seq

# Load fine-tuned model and tokenizer
model_path = "./codet5-finetuned-final"  # Update if needed
model = T5ForConditionalGeneration.from_pretrained(model_path)
tokenizer = RobertaTokenizer.from_pretrained(model_path)

# Load test dataset
test_dataset = tokenized_datasets["test"].select(range(100))
print("Original test dataset features:", test_dataset.features)

# Define data collator
data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)

# Define prediction generation function
def generate_predictions(batch):
    print(f"Generating for batch with {len(batch['input_ids'])} samples...")
    inputs = data_collator([
    {"input_ids": ids, "attention_mask": mask}
    for ids, mask in zip(batch["input_ids"], batch["attention_mask"])
])

    if inputs is None:
        raise ValueError("data_collator returned None. Check your batch structure.")

    inputs = {
        k: v.to(model.device) if isinstance(v, torch.Tensor) else v
        for k, v in inputs.items()
    }

    try:
        outputs = model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_length=128,
            num_beams=4
        )
        return {"predictions": tokenizer.batch_decode(outputs, skip_special_tokens=True)}
    except Exception as e:
        print(f"Error in generation: {e}")
        # Use length from any available batch value
        batch_size = len(next(iter(batch.values())))
        return {"predictions": [""] * batch_size}

# Apply prediction function to dataset
try:
    predictions = test_dataset.map(generate_predictions, batched=True, batch_size=8)
    print("Predictions generated.")
except Exception as e:
    print("Error during prediction mapping:", e)
    predictions = None

# If predictions were successful, compute metrics
if predictions is not None and "predictions" in predictions.column_names:
    bleu = evaluate.load("sacrebleu")
    exact_match = evaluate.load("exact_match")

    results = []
    for i in range(len(test_dataset)):
        pred = predictions["predictions"][i]
        ref = tokenizer.decode(test_dataset["labels"][i], skip_special_tokens=True)
        input_func = tokenizer.decode(test_dataset["input_ids"][i], skip_special_tokens=True)
        try:
            em_score = exact_match.compute(predictions=[pred], references=[ref])["exact_match"]
            bleu_score = bleu.compute(predictions=[pred], references=[[ref]])["score"]
        except Exception as e:
            print(f"Metric computation failed for example {i}: {e}")
            em_score, bleu_score = 0, 0
        results.append({
            "input_function": input_func,
            "expected_condition": ref,
            "predicted_condition": pred,
            "exact_match": em_score,
            "bleu_score": bleu_score
        })

    results_df = pd.DataFrame(results)
    results_df.to_csv("/content/testset-results.csv", index=False)

    # Optionally download CSV in Colab
    try:
        from google.colab import files
        files.download("/content/testset-results.csv")
    except ImportError:
        print("Not running in Colab — skipping download.")

    print("Evaluation Complete")
    print("Average Exact Match:", results_df["exact_match"].mean())
    print("Average BLEU-4:", results_df["bleu_score"].mean())
else:
    print("Prediction generation failed or returned no results.")


Original test dataset features: {'input_ids': Sequence(feature=Value(dtype='int32', id=None), length=-1, id=None), 'attention_mask': Sequence(feature=Value(dtype='int8', id=None), length=-1, id=None), 'labels': Sequence(feature=Value(dtype='int64', id=None), length=-1, id=None)}


Map:   0%|          | 0/100 [00:00<?, ? examples/s]

Generating for batch with 8 samples...
Generating for batch with 8 samples...
Generating for batch with 8 samples...
Generating for batch with 8 samples...
Generating for batch with 8 samples...
Generating for batch with 8 samples...
Generating for batch with 8 samples...
Generating for batch with 8 samples...
Generating for batch with 8 samples...
Generating for batch with 8 samples...
Generating for batch with 8 samples...
Generating for batch with 8 samples...
Generating for batch with 4 samples...
Predictions generated.


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Evaluation Complete
Average Exact Match: 0.32
Average BLEU-4: 36.803216464191


In [21]:
# ------------------------
# 9. Sample Prediction
# ------------------------
input_code = "def check_positive(num): if <mask>: return 'Positive' else: return 'Non-Positive'"
inputs = tokenizer(input_code, return_tensors="pt", padding=True, truncation=True)
outputs = model.generate(**inputs, max_length=128)
print("Predicted If Condition:", tokenizer.decode(outputs[0], skip_special_tokens=True))

Predicted If Condition: num < 0


In [None]:
# ------------------------
# 10. Run if training crashes
# ------------------------
import os
import wandb  # For experiment tracking

# Initialize Weights & Biases
wandb.init(project="my-codet5-project")

# Path to saved model checkpoints
checkpoint_dir = "./codet5-finetuned"
last_checkpoint = None

# Check for existing checkpoint
if os.path.isdir(checkpoint_dir):
    contents = os.listdir(checkpoint_dir)
    checkpoints = [f for f in contents if f.startswith("checkpoint")]
    if checkpoints:
        # Resume from latest checkpoint
        checkpoints.sort()  # or use more robust timestamping
        last_checkpoint = os.path.join(checkpoint_dir, checkpoints[-1])
        print(f"Resuming from checkpoint: {last_checkpoint}")
    else:
        print("No checkpoint found, training from scratch.")
else:
    print("No checkpoint directory found, training from scratch.")

# Train
trainer.train(resume_from_checkpoint=last_checkpoint)

print(f"Training completed on: {next(model.parameters()).device}")

# Save final model + tokenizer
final_dir = "./codet5-finetuned-final"
trainer.save_model(final_dir)
tokenizer.save_pretrained(final_dir)

