In [1]:
from datasets import load_dataset
from transformers import AutoTokenizer
import torch
import gc
import random

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Load CodeT5 tokenizer
tokenizer = AutoTokenizer.from_pretrained("Salesforce/codet5-small")

# Sample Java code
prompt = "public class Main { public static void main(String[] args) { System.out.println(\"Hello, World!\"); } }"

tokenized = tokenizer(
    prompt, 
    return_tensors="pt",
    max_length=512
)

print("Tokenized input:", tokenized)


Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.


Tokenized input: {'input_ids': tensor([[    1,   482,   667, 12740,   288,  1071,   760,   918,  2774,    12,
           780,  8526,   833,    13,   288,  2332,    18,   659,    18,  8222,
          2932, 18601,    16, 21820,  4442,  1769,   289,   289,     2]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1]])}


In [3]:
from transformers import AutoModelForSeq2SeqLM, T5ForConditionalGeneration

# Load pre-trained CodeT5 model
model = AutoModelForSeq2SeqLM.from_pretrained("Salesforce/codet5-small", use_safetensors=True)
model.config


T5Config {
  "_attn_implementation_autoset": true,
  "_name_or_path": "Salesforce/codet5-small",
  "architectures": [
    "T5ForConditionalGeneration"
  ],
  "bos_token_id": 1,
  "classifier_dropout": 0.0,
  "d_ff": 2048,
  "d_kv": 64,
  "d_model": 512,
  "decoder_start_token_id": 0,
  "dense_act_fn": "relu",
  "dropout_rate": 0.1,
  "eos_token_id": 2,
  "feed_forward_proj": "relu",
  "gradient_checkpointing": false,
  "id2label": {
    "0": "LABEL_0"
  },
  "initializer_factor": 1.0,
  "is_encoder_decoder": true,
  "is_gated_act": false,
  "label2id": {
    "LABEL_0": 0
  },
  "layer_norm_epsilon": 1e-06,
  "model_type": "t5",
  "n_positions": 512,
  "num_decoder_layers": 6,
  "num_heads": 8,
  "num_layers": 6,
  "output_past": true,
  "pad_token_id": 0,
  "relative_attention_max_distance": 128,
  "relative_attention_num_buckets": 32,
  "task_specific_params": {
    "summarization": {
      "early_stopping": true,
      "length_penalty": 2.0,
      "max_length": 200,
      "min_length

In [4]:
from datasets import load_dataset

# Load the dataset
dataset = load_dataset("code_search_net", "java", trust_remote_code=True)
train_data = dataset["train"]
valid_data = dataset["validation"]

print("Dataset columns:", train_data.column_names) 

Dataset columns: ['repository_name', 'func_path_in_repository', 'func_name', 'whole_func_string', 'language', 'func_code_string', 'func_code_tokens', 'func_documentation_string', 'func_documentation_tokens', 'split_name', 'func_code_url']


In [5]:
def preprocess_function(examples):
    # Create prompt-completion pairs
    prompts = []
    completions = []
    
    for code in examples["whole_func_string"]:
        # Split code into lines
        lines = code.split('\n')
        # Create masked version for prompt (hide latter part)
        split_point = int(len(lines) * 0.7)  # Use 70% as prompt
        prompt = '\n'.join(lines[:split_point])
        completion = '\n'.join(lines[split_point:])
        
        prompts.append(prompt)
        completions.append(completion)
    
    return {"prompt": prompts, "completion": completions}

# Process dataset -> The .map() method applies a given function to all elements (rows or examples) in the dataset.
train_data = train_data.map(preprocess_function, batched=True)
valid_data = valid_data.map(preprocess_function, batched=True)


In [6]:
def tokenize_function(examples):
    model_inputs = tokenizer(
        examples["prompt"],
        max_length=256,
        padding="max_length",
        truncation=True
    )
    """Returns:
    {
        'input_ids': [token_ids],
        'attention_mask': [1s and 0s]
    }
    """
    
    # Tokenize targets
    labels = tokenizer(
        examples["completion"],
        max_length=256,
        padding="max_length",
        truncation=True
    ).input_ids
    
    model_inputs["labels"] = labels
    """Structure:
    {
        'input_ids': [prompt tokens],
        'attention_mask': [prompt mask],
        'labels': [completion tokens]
    }
    """
    return model_inputs

train_dataset = train_data.map(tokenize_function, batched=True)
val_dataset = valid_data.map(tokenize_function, batched=True)

In [13]:
from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer

# Training arguments
training_args = Seq2SeqTrainingArguments(
    output_dir="./codet5-java",
    evaluation_strategy="epoch",
    learning_rate=3e-5,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    weight_decay=0.01,
    save_total_limit=3,
    num_train_epochs=5,
    predict_with_generate=True,
    fp16=False,  # Enable if using GPU
    logging_dir="./logs",
    max_grad_norm=1.0,
)



In [8]:
# Test sample of how the model takes data
import random
test_code = train_data[random.randint(0, len(train_data) - 1)]["whole_func_string"]
print(test_code)
prompt = ''.join(list(test_code)[:int(len(test_code) * 0.7)])
print("-----------------------")
print(prompt)

public static DataError findErrorForDoc(List<DataError> list, JsonNode node) {
        for (DataError x : list) {
            if (x.entityData == node) {
                return x;
            }
        }
        return null;
    }
-----------------------
public static DataError findErrorForDoc(List<DataError> list, JsonNode node) {
        for (DataError x : list) {
            if (x.entityData == node) {
       


In [9]:
from transformers import TrainerCallback


# Manual callback for testing
class FileLoggingCallback(TrainerCallback):
    def on_step_end(self, args, state, control, **kwargs):
        
        # Only every 10000 steps 
        if state.global_step % 10000 == 0:  
            # ----- Clearing VRAM -----
            print(f"Freeing VRAM at step {state.global_step}...")
            torch.cuda.empty_cache()
            gc.collect()
            
            # ----- Callback logic -----
            print(f"Step {state.global_step}: Running callback")
            # Input for testing
            test_code = train_data[random.randint(0, len(train_data) - 1)]["whole_func_string"]
            prompt = ''.join(list(test_code)[:int(len(test_code) * 0.7)])
            test_input = prompt
            # Tokenize the input
            inputs = tokenizer(test_input, return_tensors="pt").to(model.device)
            # Generate prediction
            outputs = model.generate(**inputs)
            # Decode the output
            decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True)
            
            # Write results to a file
            with open("callback_results.txt", "a") as f:
                f.write(f"Epoch {state.epoch}:\n")
                f.write(f"Input: {test_input}\n")
                f.write(f"Output: {decoded_output}\n\n")

In [21]:
class TrainingCallback(TrainerCallback):
    def on_step_end(self, args, state, control, **kwargs):
        if state.global_step % 100 == 0:
            try:
                # Get components from trainer
                trainer = kwargs.get('trainer')
                if not trainer:
                    print("Trainer not found in kwargs. Skipping callback.")
                    return

                # Access required components
                model = trainer.model
                tokenizer = trainer.tokenizer
                dataset = trainer.train_dataset

                # Clear memory
                print(f"\nStep {state.global_step}: Freeing VRAM...")
                torch.cuda.empty_cache()
                gc.collect()

                # Get random sample
                sample = dataset[random.randint(0, len(dataset)-1)]
                full_code = sample["whole_func_string"]
                split_point = int(len(full_code) * 0.7)
                prompt = full_code[:split_point]

                # Generate prediction
                inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
                
                outputs = model.generate(**inputs,)
                
                decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True)

                # Atomic file write
                temp_file = "training_samples.tmp"
                # with open(temp_file, "a") as f:
                #     f.write(f"\nStep {state.global_step} (Epoch {state.epoch}):\n")
                #     f.write(f"Input:\n{prompt}\n")
                #     f.write(f"Output:\n{decoded_output}\n")
                #     f.write("-" * 50 + "\n")
                
                # Replace original file atomically
                # os.replace(temp_file, "training_samples.txt")

                print(f"Step {state.global_step}: Callback completed successfully")

            except Exception as e:
                print(f"Error in callback at step {state.global_step}: {str(e)}")
                import traceback
                traceback.print_exc()

In [14]:
from transformers import DataCollatorForSeq2Seq

# Data collator for Seq2Seq models
data_collator = DataCollatorForSeq2Seq(
    tokenizer,
    model=model,
    padding=True
)

def compute_metrics(eval_pred):
    # Implement code-specific metrics here
    predictions, labels = eval_pred
    # Untokenize the predictions/labels
    decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
    
    # Simple exact match metric
    exact_matches = sum([1 for p, l in zip(decoded_preds, decoded_labels) if p == l])
    return {"exact_match": exact_matches / len(decoded_labels)}

trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    callbacks=[FileLoggingCallback()],
)

In [9]:
import torch
print(torch.cuda.is_available())  # Returns True if a GPU is available
print(torch.cuda.current_device())  # Prints the index of the current GPU
print(torch.cuda.get_device_name(0))  # Prints the name of the GPU (e.g., NVIDIA A100)


True
0
NVIDIA GeForce RTX 3050 Laptop GPU


In [20]:
torch.cuda.empty_cache()
gc.collect()

16

In [15]:
# trainer.train(resume_from_checkpoint=True)
trainer.train(resume_from_checkpoint="./codet5-java/checkpoint-112500")

There were missing keys in the checkpoint model loaded: ['encoder.embed_tokens.weight', 'decoder.embed_tokens.weight', 'lm_head.weight'].
  torch.load(os.path.join(checkpoint, OPTIMIZER_NAME), map_location=map_location)
  checkpoint_rng_state = torch.load(rng_file)


Epoch,Training Loss,Validation Loss


OverflowError: can't convert negative int to unsigned

In [16]:
# Print the callbacks
print(len(trainer.callback_handler.callbacks))  # Number of callbacks
print(trainer.callback_handler.callbacks)       # Details of each callback




3
[<transformers.trainer_callback.DefaultFlowCallback object at 0x0000023376583B50>, <__main__.FileLoggingCallback object at 0x0000023376582830>, <transformers.utils.notebook.NotebookProgressCallback object at 0x0000023376582CE0>]


In [49]:
# Test input
test_input = "static int factorial(int n)  {"

# Tokenize the input
inputs = tokenizer(test_input, return_tensors="pt").to(model.device)

# Generate output
outputs = model.generate(**inputs)

# Decode the output
decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True)

print(f"Input: {test_input}")
print(f"Output: {decoded_output}")


Input: static int factorial(int n)  {
Output:         return Math.pow(n, n);
    }


In [12]:
from transformers import pipeline
import torch

java_assistant = pipeline(
    "text2text-generation",
    model=model,
    tokenizer=tokenizer,
    device=0 if torch.cuda.is_available() else -1,
    max_length=512,
    temperature=0.7,
    top_p=0.95,
    num_return_sequences=1
)

Device set to use cuda:0


In [13]:
input_code = "public class Calculator { public int add("
output = java_assistant(input_code)
print(output[0]['generated_text'])



int i = 0; return i; }
