In [3]:
# Install dependencies
!pip install datasets transformers nltk sentencepiece



In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from transformers import AutoTokenizer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import random
import numpy as np

In [5]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
special_tokens = {"pad_token":"<PAD>", "bos_token":"<SOS>", "eos_token":"<EOS>"}
tokenizer.add_special_tokens(special_tokens)

PAD_IDX = tokenizer.pad_token_id
SOS_IDX = tokenizer.bos_token_id
EOS_IDX = tokenizer.eos_token_id

INPUT_DIM = len(tokenizer)
OUTPUT_DIM = len(tokenizer)
print(f"Vocab size: {INPUT_DIM}, PAD={PAD_IDX}, SOS={SOS_IDX}, EOS={EOS_IDX}")

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Vocab size: 30525, PAD=30522, SOS=30523, EOS=30524


In [6]:
dataset = load_dataset("Nan-Do/code-search-net-python")
full_data = dataset["train"].select(range(7000))  # small subset for speed

# Split 80% train, 10% val, 10% test
split1 = full_data.train_test_split(test_size=0.2, seed=42)
train_data = split1["train"]
temp_data = split1["test"]
split2 = temp_data.train_test_split(test_size=0.5, seed=42)
val_data = split2["train"]
test_data = split2["test"]

print(len(train_data), len(val_data), len(test_data))

README.md: 0.00B [00:00, ?B/s]

data/train-00000-of-00004-ee77a7de79eb2a(â€¦):   0%|          | 0.00/155M [00:00<?, ?B/s]

data/train-00001-of-00004-648b3bede2edf6(â€¦):   0%|          | 0.00/139M [00:00<?, ?B/s]

data/train-00002-of-00004-1dfd72b171e6b2(â€¦):   0%|          | 0.00/153M [00:00<?, ?B/s]

data/train-00003-of-00004-184ab6d0e3c690(â€¦):   0%|          | 0.00/151M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/455243 [00:00<?, ? examples/s]

5600 700 700


In [7]:
# --------------------------
# 3. DATASET CLASS
# --------------------------
class CodeDataset(Dataset):
    def __init__(self, data, tokenizer, max_doc_len=50, max_code_len=80):
        self.data = data
        self.tokenizer = tokenizer
        self.max_doc_len = max_doc_len
        self.max_code_len = max_code_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        # tokenize docstring and code
        doc = self.tokenizer.encode(item["docstring"], truncation=True,
                                    max_length=self.max_doc_len, add_special_tokens=False)
        code = self.tokenizer.encode(item["code"], truncation=True,
                                     max_length=self.max_code_len, add_special_tokens=False)
        return {"doc": doc, "code": code}

def collate_fn(batch):
    docs = [b["doc"] for b in batch]
    codes = [b["code"] for b in batch]
    max_doc = max(len(d) for d in docs)
    max_code = max(len(c) for c in codes)

    doc_pad, trg_in, trg_out = [], [], []
    for d,c in zip(docs,codes):
        doc_pad.append(d + [PAD_IDX]*(max_doc-len(d)))
        trg_in.append([SOS_IDX] + c + [PAD_IDX]*(max_code-len(c)))
        trg_out.append(c + [EOS_IDX] + [PAD_IDX]*(max_code-len(c)))

    return {
        "src": torch.tensor(doc_pad),
        "trg_in": torch.tensor(trg_in),
        "trg_out": torch.tensor(trg_out)
    }

# Dataloaders
train_loader = DataLoader(CodeDataset(train_data, tokenizer), batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader   = DataLoader(CodeDataset(val_data, tokenizer), batch_size=32, collate_fn=collate_fn)
test_loader  = DataLoader(CodeDataset(test_data, tokenizer), batch_size=32, collate_fn=collate_fn)


In [8]:
# --------------------------
# 4. MODEL
# --------------------------
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim, padding_idx=PAD_IDX)
        self.rnn = nn.RNN(emb_dim, hid_dim)

    def forward(self, src):
        embedded = self.embedding(src)         # [seq_len, batch, emb_dim]
        _, hidden = self.rnn(embedded)         # hidden: [1, batch, hid_dim]
        return hidden

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim, padding_idx=PAD_IDX)
        self.rnn = nn.RNN(emb_dim, hid_dim)
        self.fc = nn.Linear(hid_dim, output_dim)

    def forward(self, x, hidden):
        x = x.unsqueeze(0)                     # [1, batch]
        emb = self.embedding(x)                # [1, batch, emb_dim]
        out, hidden = self.rnn(emb, hidden)    # out: [1, batch, hid_dim]
        pred = self.fc(out.squeeze(0))         # [batch, vocab_size]
        return pred, hidden

class Seq2Seq(nn.Module):
    def __init__(self, enc, dec, device):
        super().__init__()
        self.enc = enc
        self.dec = dec
        self.device = device

    def forward(self, src, trg, teacher_forcing=0.5):
        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        vocab_size = self.dec.fc.out_features

        outputs = torch.zeros(trg_len, batch_size, vocab_size).to(self.device)
        hidden = self.enc(src)
        x = trg[0]  # first input = SOS

        for t in range(1, trg_len):
            out, hidden = self.dec(x, hidden)
            outputs[t] = out
            top1 = out.argmax(1)
            x = trg[t] if random.random() < teacher_forcing else top1
        return outputs

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Seq2Seq(Encoder(INPUT_DIM, 128, 256), Decoder(OUTPUT_DIM, 128, 256), device).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

In [9]:
# --------------------------
# 5. TRAINING
# --------------------------
def train(model, loader):
    model.train()
    total_loss = 0
    for b in loader:
        src = b["src"].transpose(0,1).to(device)
        trg_in = b["trg_in"].transpose(0,1).to(device)
        trg_out = b["trg_out"].transpose(0,1).to(device)

        optimizer.zero_grad()
        output = model(src, trg_in)

        output = output[1:].reshape(-1, output.shape[-1])
        trg_out = trg_out[1:].reshape(-1)

        loss = criterion(output, trg_out)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)

def evaluate(model, loader):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for b in loader:
            src = b["src"].transpose(0,1).to(device)
            trg_in = b["trg_in"].transpose(0,1).to(device)
            trg_out = b["trg_out"].transpose(0,1).to(device)

            output = model(src, trg_in, teacher_forcing=0)
            output = output[1:].reshape(-1, output.shape[-1])
            trg_out = trg_out[1:].reshape(-1)

            loss = criterion(output, trg_out)
            total_loss += loss.item()
    return total_loss / len(loader)


In [10]:
# --------------------------
# 6. GENERATION
# --------------------------
def generate_code(model, docstring, max_len=80):
    model.eval()
    tokens = tokenizer.encode(docstring, add_special_tokens=False)
    src = torch.tensor(tokens).unsqueeze(1).to(device)

    output_tokens = []
    with torch.no_grad():
        hidden = model.enc(src)
        x = torch.tensor([SOS_IDX]).to(device)
        for _ in range(max_len):
            out, hidden = model.dec(x, hidden)
            top1 = out.argmax(1).item()
            if top1 == EOS_IDX:
                break
            output_tokens.append(top1)
            x = torch.tensor([top1]).to(device)

    return tokenizer.decode(output_tokens)


In [11]:
# --------------------------
# 7. BLEU SCORE
# --------------------------
smooth = SmoothingFunction().method1

def bleu_score(model, dataset, n_samples=100):
    scores = []
    for i in range(n_samples):
        ref_text = dataset[i]["code"]
        doc_text = dataset[i]["docstring"]
        pred_text = generate_code(model, doc_text)

        # Tokenize both reference and prediction with tokenizer
        ref_tokens = tokenizer.encode(ref_text, add_special_tokens=False)
        pred_tokens = tokenizer.encode(pred_text, add_special_tokens=False)

        ref_tokens = [str(tok) for tok in ref_tokens]
        pred_tokens = [str(tok) for tok in pred_tokens]

        scores.append(sentence_bleu([ref_tokens], pred_tokens, smoothing_function=smooth))
    return np.mean(scores)

In [12]:
# --------------------------
# 8. TRAIN LOOP
# --------------------------
best_val_loss = float('inf')
for epoch in range(10):
    train_loss = train(model, train_loader)
    val_loss = evaluate(model, val_loader)
    print(f"Epoch {epoch+1}: Train Loss={train_loss:.4f}, Val Loss={val_loss:.4f}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "best_model.pt")
        print("ðŸ’¾ Saved Best Model!")

Epoch 1: Train Loss=6.3415, Val Loss=5.9218
ðŸ’¾ Saved Best Model!
Epoch 2: Train Loss=5.5308, Val Loss=5.8069
ðŸ’¾ Saved Best Model!
Epoch 3: Train Loss=5.3919, Val Loss=5.8327
Epoch 4: Train Loss=5.2686, Val Loss=6.3155
Epoch 5: Train Loss=5.1840, Val Loss=5.9557
Epoch 6: Train Loss=5.1175, Val Loss=5.8516
Epoch 7: Train Loss=5.0636, Val Loss=5.9465
Epoch 8: Train Loss=5.0214, Val Loss=6.0190
Epoch 9: Train Loss=4.9552, Val Loss=6.4316
Epoch 10: Train Loss=4.9233, Val Loss=6.4434


In [14]:
# --------------------------
# 9. EVALUATION
# --------------------------
model.load_state_dict(torch.load("best_model.pt"))
model.to(device)
print("BLEU score on 100 test examples:", bleu_score(model, test_data, n_samples=100))

# Quick test example
print("Example docstring:", test_data[0]["docstring"])
print("Generated code:", generate_code(model, test_data[0]["docstring"]))
print("Reference code:", test_data[0]["code"])

BLEU score on 100 test examples: 0.020252689831894953
Example docstring: Process the inner datasets.
Generated code: _ _ _ (, _, _, _, _ : " " " " " a.. " " " ".. " " " ".. " " " ".. " " " ".. " " " ".. " " " ".. " " " ".. " " " ".. " " " ".. " " " ".
Reference code: def process(self):
        "Process the inner datasets."
        xp,yp = self.get_processors()
        for ds,n in zip(self.lists, ['train','valid','test']): ds.process(xp, yp, name=n)
        for ds in self.lists:
            if getattr(ds, 'warn', False): warn(ds.warn)
        return self
