In [None]:
import json

def load_and_preprocess_data(file_path):
    """Loads and preprocesses the CodeNet dataset for training."""
    
    with open(file_path, "r", encoding="utf-8") as f:
        data = json.load(f)  # Load all data at once

    preprocessed_data = []

    for i, entry in enumerate(data):
        
            preprocessed_data.append(entry)

    return preprocessed_data

# Replace with actual path
train_file = "/kaggle/input/code-net-python/train.jsonl"
train_data = load_and_preprocess_data(train_file)
print(train_data[0])

In [None]:
!pip install torch transformers


In [None]:
from transformers import RobertaTokenizer

tokenizer = RobertaTokenizer.from_pretrained('microsoft/codebert-base')

In [None]:
validation_file = "/kaggle/input/code-net-python/valid.jsonl"
validation_data = load_and_preprocess_data(validation_file)

In [None]:
def get_token_lengths(data, tokenizer):
    src_lengths = []
    tgt_lengths = []
    for example in data:
        src_text = " ".join(example['src'])
        tgt_text = " ".join(example['tgt'])

        src_enc = tokenizer(src_text, truncation=False)['input_ids']
        tgt_enc = tokenizer(tgt_text, truncation=False)['input_ids']

        src_lengths.append(len(src_enc))
        tgt_lengths.append(len(tgt_enc))
    
    return src_lengths, tgt_lengths

import numpy as np
src_lengths,tgt_lengths = get_token_lengths(train_data[50000:80000],tokenizer)

print("Source Lengths:")
print(f"Mean: {np.mean(src_lengths):.2f}, 90th percentile: {np.percentile(src_lengths, 90)}, Max: {max(src_lengths)}")

print("\nTarget Lengths:")
print(f"Mean: {np.mean(tgt_lengths):.2f}, 90th percentile: {np.percentile(tgt_lengths, 90)}, Max: {max(tgt_lengths)}")



In [None]:
print(tokenizer.model_max_length)  # This will print 512


In [None]:
from torch.utils.data import Dataset, DataLoader
def encode_example(example, tokenizer, max_length=512):
    src_tokens = example['src']
    tgt_tokens = example['tgt']
    src_text = " ".join(src_tokens)
    
    # Add start and end tokens to the target
    tgt_text = "<s> " + " ".join(tgt_tokens) + " </s>"
    
    src_enc = tokenizer(src_text, max_length=max_length, truncation=True, padding="max_length", return_tensors="pt")
    tgt_enc = tokenizer(tgt_text, max_length=max_length, truncation=True, padding="max_length", return_tensors="pt")
    return src_enc, tgt_enc



class PreTokenizedDataset(Dataset):
    def __init__(self, tokenized_data):
        self.tokenized_data = tokenized_data
    
    def __len__(self):
        return len(self.tokenized_data)
    
    def __getitem__(self, idx):
        return self.tokenized_data[idx]

def pre_tokenize_data(data, tokenizer, max_length=512):
    tokenized_data = []
    for example in data:
        src_enc, tgt_enc = encode_example(example, tokenizer, max_length)
        tokenized_data.append({
            'src_input_ids': src_enc['input_ids'].squeeze(0),
            'src_attention_mask': src_enc['attention_mask'].squeeze(0),
            'tgt_input_ids': tgt_enc['input_ids'].squeeze(0),
            'tgt_attention_mask': tgt_enc['attention_mask'].squeeze(0)
        })
    return tokenized_data

tokenized_train_data = pre_tokenize_data(train_data[80000:130000], tokenizer, max_length=512)
pretokenized_dataset = PreTokenizedDataset(tokenized_train_data)
train_loader = DataLoader(pretokenized_dataset, batch_size=8, shuffle=True)

tokenized_valid_data = pre_tokenize_data(validation_data[80000:130000], tokenizer, max_length=512)
pretokenized_valid_dataset = PreTokenizedDataset(tokenized_valid_data)
valid_loader = DataLoader(pretokenized_valid_dataset, batch_size=8, shuffle=True)





In [None]:
import torch.nn as nn
import torch.nn.functional as F
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=512):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)  # (max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float) * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        # x shape: (batch_size, seq_len, d_model)
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

class CodeErrorFixModel(nn.Module):
    def __init__(self, encoder_model_name, vocab_size, embed_size=768, num_decoder_layers=6, nhead=8):
        super().__init__()
        # Load the pretrained CodeBERT encoder
        self.encoder = AutoModel.from_pretrained(encoder_model_name)
        # Decoder components
        self.decoder_embedding = nn.Embedding(vocab_size, embed_size)
        self.pos_encoder = PositionalEncoding(embed_size)
        decoder_layer = nn.TransformerDecoderLayer(d_model=embed_size, nhead=nhead, dropout=0.1)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)
        self.fc_out = nn.Linear(embed_size, vocab_size)
        self.embed_size = embed_size
    
    def generate_square_subsequent_mask(self, sz):
        # Create a mask to ensure that each position only attends to previous positions
        mask = torch.triu(torch.ones(sz, sz), diagonal=1).bool()
        return mask.to(next(self.parameters()).device)
    
    def forward(self, src_input_ids, src_attention_mask, tgt_input_ids, tgt_attention_mask):
        # Encode source sequence
        encoder_outputs = self.encoder(input_ids=src_input_ids, attention_mask=src_attention_mask)
        memory = encoder_outputs.last_hidden_state  # shape: (batch_size, src_seq_len, embed_size)
        
        # Prepare target embeddings
        tgt_embeddings = self.decoder_embedding(tgt_input_ids) * math.sqrt(self.embed_size)
        tgt_embeddings = self.pos_encoder(tgt_embeddings)
        # Transformer expects (seq_len, batch_size, embed_size)
        tgt_embeddings = tgt_embeddings.transpose(0, 1)
        memory = memory.transpose(0, 1)
        
        tgt_seq_len = tgt_input_ids.size(1)
        # Create target mask for auto-regressive generation
        tgt_mask = self.generate_square_subsequent_mask(tgt_seq_len)
        
        decoder_output = self.decoder(tgt=tgt_embeddings, memory=memory, tgt_mask=tgt_mask)
        # Transpose back: (batch_size, seq_len, embed_size)
        decoder_output = decoder_output.transpose(0, 1)
        logits = self.fc_out(decoder_output)  # (batch_size, seq_len, vocab_size)
        return logits


In [None]:
import torch
import math
from transformers import AutoTokenizer, AutoModel
# === Step 1: Load tokenizer from saved folder ===
tokenizer = RobertaTokenizer.from_pretrained("/kaggle/input/model-pyfix/tokenizer_dir")

# === Step 2: Load model from .pth ===
model = torch.load("/kaggle/input/model-pyfix/PYFIX_MODEL/full_model.pth")

# === Step 3: Send to device ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5, weight_decay=0.01)
# We'll ignore the padding tokens when computing loss
pad_token_id = tokenizer.pad_token_id  # Make sure you have a tokenizer object
criterion = nn.CrossEntropyLoss(ignore_index=pad_token_id)


In [None]:
!pip install tqdm


In [None]:
vocab_size = tokenizer.vocab_size

In [None]:
from tqdm.notebook import tqdm

num_epochs = 3

model.train()
print("Training Started")

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    
    for i, batch in enumerate(tqdm(train_loader, total=len(train_loader), desc=f"Epoch {epoch+1}")):
        src_input_ids = batch['src_input_ids'].to(device)
        src_attention_mask = batch['src_attention_mask'].to(device)
        tgt_input_ids = batch['tgt_input_ids'].to(device)
        tgt_attention_mask = batch['tgt_attention_mask'].to(device)
        
        optimizer.zero_grad()
        
        decoder_input_ids = tgt_input_ids[:, :-1]
        decoder_target_ids = tgt_input_ids[:, 1:]
        
        logits = model(
            src_input_ids=src_input_ids,
            src_attention_mask=src_attention_mask,
            tgt_input_ids=decoder_input_ids,
            tgt_attention_mask=tgt_attention_mask[:, :-1]
        )
        
        logits = logits.reshape(-1, vocab_size)
        decoder_target_ids = decoder_target_ids.reshape(-1)
        
        loss = criterion(logits, decoder_target_ids)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f" Epoch {epoch+1}/{num_epochs} - Average Training Loss: {avg_loss:.4f}")
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for batch in valid_loader:
            src_input_ids = batch['src_input_ids'].to(device)
            src_attention_mask = batch['src_attention_mask'].to(device)
            tgt_input_ids = batch['tgt_input_ids'].to(device)
            tgt_attention_mask = batch['tgt_attention_mask'].to(device)

            decoder_input_ids = tgt_input_ids[:, :-1]
            decoder_target_ids = tgt_input_ids[:, 1:]

            logits = model(
                src_input_ids=src_input_ids,
                src_attention_mask=src_attention_mask,
                tgt_input_ids=decoder_input_ids,
                tgt_attention_mask=tgt_attention_mask[:, :-1]
            )

            logits = logits.reshape(-1, vocab_size)
            decoder_target_ids = decoder_target_ids.reshape(-1)

            loss = criterion(logits, decoder_target_ids)
            val_loss += loss.item()

    val_avg_loss = val_loss / len(valid_loader)
    print(f" Epoch {epoch+1} - Validation Loss: {val_avg_loss:.4f}")


In [None]:
torch.save(model, "full_model.pth")


In [None]:
!zip -r model_archive.zip full_model.pth


In [None]:
import json
import requests
from google.colab import auth  # works in Kaggle too
import google.auth
from google.auth.transport.requests import Request




auth.authenticate_user()
creds, _ = google.auth.default()
creds.refresh(Request())
access_token = creds.token



In [None]:
file_path = "/kaggle/working/model_archive.zip"  # Change this
file_name = "PYFIX_MODEL_3.zip"

headers = {
    "Authorization": f"Bearer {access_token}"
}

metadata = {
    "name": file_name,
    "mimeType": "application/zip"
}

files = {
    "data": ("metadata", json.dumps(metadata), "application/json"),
    "file": open(file_path, "rb")
}

upload_url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart"

res = requests.post(upload_url, headers=headers, files=files)
res.raise_for_status()

print(" Upload successful!")
print("File ID:", res.json()["id"])


In [None]:
tokenizer.save_pretrained("tokenizer_dir")


In [None]:
import shutil
shutil.make_archive("tokenizer_dir", 'zip', "tokenizer_dir")


In [None]:
import torch
from transformers import AutoTokenizer  # or your specific tokenizer

# Load model
model = torch.load("full_model.pth", map_location=torch.device("cpu"))
model.eval()

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained("tokenizer_dir")

# Example usage
src_code = "def add(x, y): return x + y"
tokens = tokenizer(src_code, return_tensors="pt", padding="max_length", truncation=True, max_length=512)

# Inference
with torch.no_grad():
    output = model(tokens["input_ids"], tokens["attention_mask"], ...)


In [None]:
test_file = "/kaggle/input/code-net-test/test.jsonl"
test_data = load_and_preprocess_data(test_file)


In [None]:
import torch
from transformers import RobertaTokenizer
from torch.utils.data import DataLoader

# 1. Load your saved model
model = torch.load('/kaggle/working/full_model.pth')  
model.eval()  # Set the model to evaluation mode

# 2. Load the tokenizer
tokenizer = RobertaTokenizer.from_pretrained('microsoft/codebert-base')

# 3. Prepare test data
data_test = test_data[:1]  # or use more examples if needed

# Tokenize the test data
tokenized_test_data = pre_tokenize_data(data_test, tokenizer, max_length=512)

# Create DataLoader
test_loader = DataLoader(tokenized_test_data, batch_size=1)  # batch_size=1 for clarity

# 4. Run inference
with torch.no_grad():
    for batch in test_loader:
        src_input_ids = batch['src_input_ids'].to(device)
        src_attention_mask = batch['src_attention_mask'].to(device)
        tgt_input_ids = batch['tgt_input_ids'].to(device)
        tgt_attention_mask = batch['tgt_attention_mask'].to(device)

        # 🔮 Predict (use teacher forcing)
        output = model(src_input_ids, src_attention_mask, tgt_input_ids[:, :-1], tgt_attention_mask[:, :-1])
        predicted_ids = output.argmax(dim=-1)

        # Decode Input (buggy), Prediction, and Target (ground truth)
        input_text = tokenizer.decode(src_input_ids[0], skip_special_tokens=True)
        predicted_text = tokenizer.decode(predicted_ids[0], skip_special_tokens=True)
        target_text = tokenizer.decode(tgt_input_ids[0], skip_special_tokens=True)

        print(" Input (Buggy Code):")
        print(input_text)
        print("\n Prediction (Model Fix):")
        print(predicted_text)
        print("\nTarget (Ground Truth Fix):")
        print(target_text)
        print("=" * 80)


In [None]:
print(test_data[9])