# Code Documentation Generator (Code-to-Text)

This notebook demonstrates building a Transformer model from scratch to generate Python docstrings from code using the CodeXGLUE dataset.

# Code Documentation Generator (Code-to-Text)

End-to-end Transformer pipeline for generating Python docstrings from code.

In [None]:
!pip install datasets tokenizers torch transformers tqdm

In [None]:
import torch
import datasets
import tokenizers

print('Torch:', torch.__version__)
print('Datasets:', datasets.__version__)
print('Tokenizers:', tokenizers.__version__)

In [None]:
from datasets import load_dataset

print('Loading CodeXGLUE (code â†’ docstring, Python)...')
dataset = load_dataset('code_x_glue_ct_code_to_text', 'python', split='train', streaming=True)
print('Dataset loaded successfully.')

In [None]:
for ex in dataset:
    print('CODE:', ex['code'][:300])
    print('DOCSTRING:', ex['docstring'][:300])
    break

## Tokenization with BPE

In [None]:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import ByteLevel

def text_iterator(dataset, max_samples=30000):
    for i, ex in enumerate(dataset):
        if i >= max_samples:
            break
        yield ex['code']
        yield ex['docstring']

In [None]:
tokenizer = Tokenizer(BPE(unk_token='<unk>'))
tokenizer.pre_tokenizer = ByteLevel()

trainer = BpeTrainer(vocab_size=32000, special_tokens=['<pad>', '<unk>', '<bos>', '<eos>'])
tokenizer.train_from_iterator(text_iterator(dataset), trainer=trainer)
tokenizer.save('code_doc_tokenizer.json')
print('Tokenizer trained and saved.')

## Transformer Model from Scratch

In [None]:
import torch.nn as nn
import math

# Model Configuration
d_model = 256
num_heads = 8
num_layers = 4
d_ff = 1024
max_len = 512

from tokenizers.decoders import ByteLevel
tokenizer = Tokenizer.from_file('code_doc_tokenizer.json')
tokenizer.decoder = ByteLevel()

vocab_size = tokenizer.get_vocab_size()
pad_token_id = tokenizer.token_to_id('<pad>')
bos_token_id = tokenizer.token_to_id('<bos>')
eos_token_id = tokenizer.token_to_id('<eos>')

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, num_heads, dropout=dropout, batch_first=True)
        self.ffn = nn.Sequential(nn.Linear(d_model, d_ff), nn.ReLU(), nn.Linear(d_ff, d_model))
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, src_mask=None, src_key_padding_mask=None):
        attn_out, _ = self.self_attn(x, x, x, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)
        x = self.norm1(x + self.dropout(attn_out))
        ffn_out = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_out))
        return x

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, num_heads, dropout=dropout, batch_first=True)
        self.cross_attn = nn.MultiheadAttention(d_model, num_heads, dropout=dropout, batch_first=True)
        self.ffn = nn.Sequential(nn.Linear(d_model, d_ff), nn.ReLU(), nn.Linear(d_ff, d_model))
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, enc_out, tgt_mask=None, tgt_key_padding_mask=None, src_key_padding_mask=None):
        attn_out, _ = self.self_attn(x, x, x, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask)
        x = self.norm1(x + self.dropout(attn_out))
        attn_out, _ = self.cross_attn(x, enc_out, enc_out, key_padding_mask=src_key_padding_mask)
        x = self.norm2(x + self.dropout(attn_out))
        ffn_out = self.ffn(x)
        x = self.norm3(x + self.dropout(ffn_out))
        return x

In [None]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_len, pad_token_id):
        super().__init__()
        self.src_embed = nn.Embedding(vocab_size, d_model, padding_idx=pad_token_id)
        self.tgt_embed = nn.Embedding(vocab_size, d_model, padding_idx=pad_token_id)
        self.positional_encoding = PositionalEncoding(d_model, max_len)
        self.encoder_layers = nn.ModuleList([EncoderBlock(d_model, num_heads, d_ff) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderBlock(d_model, num_heads, d_ff) for _ in range(num_layers)])
        self.output_layer = nn.Linear(d_model, vocab_size)
    
    def forward(self, src, tgt, src_key_padding_mask=None, tgt_key_padding_mask=None, tgt_mask=None):
        src = self.positional_encoding(self.src_embed(src))
        tgt = self.positional_encoding(self.tgt_embed(tgt))
        enc_out = src
        for layer in self.encoder_layers:
            enc_out = layer(enc_out, src_key_padding_mask=src_key_padding_mask)
        dec_out = tgt
        for layer in self.decoder_layers:
            dec_out = layer(dec_out, enc_out, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_key_padding_mask, src_key_padding_mask=src_key_padding_mask)
        return self.output_layer(dec_out)

In [None]:
def generate_causal_mask(size):
    mask = torch.triu(torch.ones(size, size), diagonal=1)
    mask = mask.masked_fill(mask == 1, float('-inf'))
    return mask

## Training Loop (Streaming)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Transformer(vocab_size, d_model, num_heads, num_layers, d_ff, max_len, pad_token_id).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=pad_token_id)
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)

def tokenize_pair(code, docstring, max_len):
    src = tokenizer.encode(code).ids[:max_len - 2]
    tgt = tokenizer.encode(docstring).ids[:max_len - 2]
    src = [bos_token_id] + src + [eos_token_id]
    tgt = [bos_token_id] + tgt + [eos_token_id]
    return src, tgt

def pad_batch(seqs, pad_id):
    max_len = max(len(s) for s in seqs)
    return [s + [pad_id] * (max_len - len(s)) for s in seqs]

In [None]:
# Training loop
model.train()
batch_size = 8
num_steps = 8000
step = 0
src_batch, tgt_batch = [], []

for example in dataset:
    src, tgt = tokenize_pair(example['code'], example['docstring'], max_len)
    src_batch.append(src)
    tgt_batch.append(tgt)
    if len(src_batch) == batch_size:
        src_batch = torch.tensor(pad_batch(src_batch, pad_token_id)).to(device)
        tgt_batch = torch.tensor(pad_batch(tgt_batch, pad_token_id)).to(device)
        tgt_input = tgt_batch[:, :-1]
        tgt_output = tgt_batch[:, 1:]
        tgt_mask = generate_causal_mask(tgt_input.size(1)).to(device)
        logits = model(src_batch, tgt_input, tgt_mask=tgt_mask)
        loss = criterion(logits.reshape(-1, vocab_size), tgt_output.reshape(-1))
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        if step % 100 == 0:
            print(f'Step {step} | Loss: {loss.item():.4f}')
        if step % 500 == 0:
            torch.save({'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'step': step}, 'checkpoint.pt')
            print(f'Checkpoint saved at step {step}')
        step += 1
        src_batch, tgt_batch = [], []
        if step >= num_steps:
            break

## Inference with Top-k Sampling

In [None]:
@torch.no_grad()
def generate_docstring(code_text, max_len=128, temperature=0.7, top_k=30):
    model.eval()
    prompt = 'Generate a clear Python docstring for the following function:\n\n' + code_text
    src_ids = tokenizer.encode(prompt).ids[:max_len - 2]
    src_ids = [bos_token_id] + src_ids + [eos_token_id]
    src = torch.tensor(src_ids).unsqueeze(0).to(device)
    generated = [bos_token_id]
    for _ in range(max_len):
        tgt = torch.tensor(generated).unsqueeze(0).to(device)
        tgt_mask = generate_causal_mask(tgt.size(1)).to(device)
        logits = model(src, tgt, tgt_mask=tgt_mask)
        next_token_logits = logits[0, -1] / temperature
        values, indices = torch.topk(next_token_logits, k=top_k)
        probs = torch.softmax(values, dim=-1)
        next_token_id = indices[torch.multinomial(probs, 1)].item()
        if next_token_id == eos_token_id:
            break
        generated.append(next_token_id)
    return tokenizer.decode(generated[1:], skip_special_tokens=True).strip()

test_code = 'def add(a, b):\n    return a + b'
print(generate_docstring(test_code))

In [2]:
!pip install datasets tokenizers torch transformers tqdm




In [3]:
import torch
import datasets
import tokenizers

print("Torch:", torch.__version__)
print("Datasets:", datasets.__version__)
print("Tokenizers:", tokenizers.__version__)


Torch: 2.9.0+cu126
Datasets: 4.0.0
Tokenizers: 0.22.2


In [4]:
from datasets import load_dataset

print("Loading CodeXGLUE (code â†’ docstring, Python)...")

dataset = load_dataset(
    "code_x_glue_ct_code_to_text",
    "python",
    split="train",
    streaming=True
)

print("Dataset loaded successfully.")


Loading CodeXGLUE (code â†’ docstring, Python)...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

Dataset loaded successfully.


In [5]:
for ex in dataset:
    print("CODE:\n", ex["code"][:300])
    print("\nDOCSTRING:\n", ex["docstring"][:300])
    break


CODE:
 def settext(self, text, cls='current'):
        """Set the text for this element.

        Arguments:
            text (str): The text
            cls (str): The class of the text, defaults to ``current`` (leave this unless you know what you are doing). There may be only one text content element of 

DOCSTRING:
 Set the text for this element.

        Arguments:
            text (str): The text
            cls (str): The class of the text, defaults to ``current`` (leave this unless you know what you are doing). There may be only one text content element of each class associated with the element.


In [10]:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import ByteLevel

def text_iterator(dataset, max_samples=30000):
    for i, ex in enumerate(dataset):
        if i >= max_samples:
            break
        yield ex["code"]
        yield ex["docstring"]


In [11]:
tokenizer = Tokenizer(BPE(unk_token="<unk>"))
tokenizer.pre_tokenizer = ByteLevel()

trainer = BpeTrainer(
    vocab_size=32000,
    special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
)

tokenizer.train_from_iterator(
    text_iterator(dataset),
    trainer=trainer
)

tokenizer.save("code_doc_tokenizer.json")

print("Tokenizer trained and saved.")


Tokenizer trained and saved.


In [45]:
from tokenizers import Tokenizer
from tokenizers.decoders import ByteLevel

tokenizer = Tokenizer.from_file("code_doc_tokenizer.json")
tokenizer.decoder = ByteLevel()


In [12]:
output = tokenizer.encode("def add(a, b): return a + b")
print(output.tokens)


['Ä def', 'Ä add', '(', 'a', ',', 'Ä b', '):', 'Ä return', 'Ä a', 'Ä +', 'Ä b']


In [13]:
# ===== Model Configuration =====
d_model = 256
num_heads = 8
num_layers = 4
d_ff = 1024
max_len = 512

vocab_size = tokenizer.get_vocab_size()  # from your trained tokenizer
pad_token_id = tokenizer.token_to_id("<pad>")
bos_token_id = tokenizer.token_to_id("<bos>")
eos_token_id = tokenizer.token_to_id("<eos>")


In [14]:
import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super().__init__()

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)

        div_term = torch.exp(
            torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
        )

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        self.register_buffer("pe", pe)

    def forward(self, x):
        # x: (batch_size, seq_len, d_model)
        return x + self.pe[:, :x.size(1)]


In [15]:
pe = PositionalEncoding(d_model, max_len)
dummy = torch.zeros(1, 10, d_model)
out = pe(dummy)
print(out.shape)


torch.Size([1, 10, 256])


In [16]:
class EncoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()

        self.self_attn = nn.MultiheadAttention(
            embed_dim=d_model,
            num_heads=num_heads,
            dropout=dropout,
            batch_first=True
        )

        self.ffn = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x, src_mask=None, src_key_padding_mask=None):
        # Self-attention
        attn_out, _ = self.self_attn(
            x, x, x,
            attn_mask=src_mask,
            key_padding_mask=src_key_padding_mask
        )
        x = self.norm1(x + self.dropout(attn_out))

        # Feed Forward
        ffn_out = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_out))

        return x


In [17]:
encoder_block = EncoderBlock(d_model, num_heads, d_ff)
dummy = torch.randn(2, 20, d_model)
out = encoder_block(dummy)
print(out.shape)

torch.Size([2, 20, 256])


In [18]:
class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()

        self.self_attn = nn.MultiheadAttention(
            d_model, num_heads, dropout=dropout, batch_first=True
        )

        self.cross_attn = nn.MultiheadAttention(
            d_model, num_heads, dropout=dropout, batch_first=True
        )

        self.ffn = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

        self.dropout = nn.Dropout(dropout)

    def forward(
        self,
        x,
        enc_out,
        tgt_mask=None,
        tgt_key_padding_mask=None,
        src_key_padding_mask=None
    ):
        # 1. Masked self-attention
        attn_out, _ = self.self_attn(
            x, x, x,
            attn_mask=tgt_mask,
            key_padding_mask=tgt_key_padding_mask
        )
        x = self.norm1(x + self.dropout(attn_out))

        # 2. Cross-attention (decoder attends to encoder)
        attn_out, _ = self.cross_attn(
            x, enc_out, enc_out,
            key_padding_mask=src_key_padding_mask
        )
        x = self.norm2(x + self.dropout(attn_out))

        # 3. Feed Forward
        ffn_out = self.ffn(x)
        x = self.norm3(x + self.dropout(ffn_out))

        return x


In [19]:
decoder_block = DecoderBlock(d_model, num_heads, d_ff)
dummy_dec = torch.randn(2, 15, d_model)
dummy_enc = torch.randn(2, 20, d_model)

out = decoder_block(dummy_dec, dummy_enc)
print(out.shape)


torch.Size([2, 15, 256])


In [20]:
class Transformer(nn.Module):
    def __init__(
        self,
        vocab_size,
        d_model,
        num_heads,
        num_layers,
        d_ff,
        max_len,
        pad_token_id
    ):
        super().__init__()

        self.pad_token_id = pad_token_id

        # Embeddings
        self.src_embed = nn.Embedding(vocab_size, d_model, padding_idx=pad_token_id)
        self.tgt_embed = nn.Embedding(vocab_size, d_model, padding_idx=pad_token_id)

        self.positional_encoding = PositionalEncoding(d_model, max_len)

        # Encoder & Decoder stacks
        self.encoder_layers = nn.ModuleList([
            EncoderBlock(d_model, num_heads, d_ff)
            for _ in range(num_layers)
        ])

        self.decoder_layers = nn.ModuleList([
            DecoderBlock(d_model, num_heads, d_ff)
            for _ in range(num_layers)
        ])

        # Output head
        self.output_layer = nn.Linear(d_model, vocab_size)

    def forward(
        self,
        src,
        tgt,
        src_key_padding_mask=None,
        tgt_key_padding_mask=None,
        tgt_mask=None
    ):
        # Embedding + position
        src = self.positional_encoding(self.src_embed(src))
        tgt = self.positional_encoding(self.tgt_embed(tgt))

        # Encoder
        enc_out = src
        for layer in self.encoder_layers:
            enc_out = layer(enc_out, src_key_padding_mask=src_key_padding_mask)

        # Decoder
        dec_out = tgt
        for layer in self.decoder_layers:
            dec_out = layer(
                dec_out,
                enc_out,
                tgt_mask=tgt_mask,
                tgt_key_padding_mask=tgt_key_padding_mask,
                src_key_padding_mask=src_key_padding_mask
            )

        # Vocabulary projection
        return self.output_layer(dec_out)


In [21]:
def generate_causal_mask(size):
    mask = torch.triu(torch.ones(size, size), diagonal=1)
    mask = mask.masked_fill(mask == 1, float("-inf"))
    return mask


In [22]:
model = Transformer(
    vocab_size=vocab_size,
    d_model=d_model,
    num_heads=num_heads,
    num_layers=num_layers,
    d_ff=d_ff,
    max_len=max_len,
    pad_token_id=pad_token_id
)

src = torch.randint(0, vocab_size, (2, 50))
tgt = torch.randint(0, vocab_size, (2, 30))

tgt_mask = generate_causal_mask(tgt.size(1))

out = model(src, tgt, tgt_mask=tgt_mask)
print(out.shape)


torch.Size([2, 30, 32000])


In [23]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


In [24]:
criterion = nn.CrossEntropyLoss(ignore_index=pad_token_id)
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)


In [28]:
def tokenize_pair(code, docstring, max_len):
    src = tokenizer.encode(code).ids[:max_len - 2]
    tgt = tokenizer.encode(docstring).ids[:max_len - 2]

    src = [bos_token_id] + src + [eos_token_id]
    tgt = [bos_token_id] + tgt + [eos_token_id]

    return src, tgt



In [29]:
def pad_batch(seqs, pad_id):
    max_len = max(len(s) for s in seqs)
    return [s + [pad_id] * (max_len - len(s)) for s in seqs]


In [58]:
model.train()

batch_size = 8
num_steps = 8000   # new target

src_batch, tgt_batch = [], []

for example in dataset:
    src, tgt = tokenize_pair(example["code"], example["docstring"], max_len)

    src_batch.append(src)
    tgt_batch.append(tgt)

    if len(src_batch) == batch_size:
        src_batch = torch.tensor(pad_batch(src_batch, pad_token_id)).to(device)
        tgt_batch = torch.tensor(pad_batch(tgt_batch, pad_token_id)).to(device)

        tgt_input = tgt_batch[:, :-1]
        tgt_output = tgt_batch[:, 1:]

        tgt_mask = generate_causal_mask(tgt_input.size(1)).to(device)

        logits = model(src_batch, tgt_input, tgt_mask=tgt_mask)

        loss = criterion(
            logits.reshape(-1, vocab_size),
            tgt_output.reshape(-1)
        )

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        if step % 100 == 0:
            print(f"Step {step} | Loss: {loss.item():.4f}")

        if step % 500 == 0:
            torch.save({
                "model": model.state_dict(),
                "optimizer": optimizer.state_dict(),
                "step": step
            }, "checkpoint.pt")
            print(f"ðŸ’¾ Checkpoint saved at step {step}")

        src_batch, tgt_batch = [], []
        step += 1

        if step >= num_steps:
            break


Step 0 | Loss: 4.4022
ðŸ’¾ Checkpoint saved at step 0
Step 100 | Loss: 2.6167
Step 200 | Loss: 2.3793
Step 300 | Loss: 5.0722
Step 400 | Loss: 4.4966
Step 500 | Loss: 3.9470
ðŸ’¾ Checkpoint saved at step 500
Step 600 | Loss: 4.3703
Step 700 | Loss: 4.7572
Step 800 | Loss: 5.0899
Step 900 | Loss: 5.0568
Step 1000 | Loss: 5.0138
ðŸ’¾ Checkpoint saved at step 1000
Step 1100 | Loss: 4.6302
Step 1200 | Loss: 3.3548
Step 1300 | Loss: 4.9913
Step 1400 | Loss: 4.5126
Step 1500 | Loss: 4.7811
ðŸ’¾ Checkpoint saved at step 1500
Step 1600 | Loss: 5.6211
Step 1700 | Loss: 4.4565
Step 1800 | Loss: 4.9083
Step 1900 | Loss: 5.0291
Step 2000 | Loss: 5.0217
ðŸ’¾ Checkpoint saved at step 2000
Step 2100 | Loss: 5.0984
Step 2200 | Loss: 4.7637
Step 2300 | Loss: 4.6630
Step 2400 | Loss: 5.1246
Step 2500 | Loss: 5.2098
ðŸ’¾ Checkpoint saved at step 2500
Step 2600 | Loss: 4.7057
Step 2700 | Loss: 4.9400
Step 2800 | Loss: 5.2711
Step 2900 | Loss: 4.4749
Step 3000 | Loss: 4.9666
ðŸ’¾ Checkpoint saved at step 3

In [61]:
ckpt = torch.load("checkpoint.pt")
model.load_state_dict(ckpt["model"])
optimizer.load_state_dict(ckpt["optimizer"])
step = ckpt["step"]
print("Resumed from step:", step)

Resumed from step: 19500


In [62]:
model.eval()


Transformer(
  (src_embed): Embedding(32000, 256, padding_idx=0)
  (tgt_embed): Embedding(32000, 256, padding_idx=0)
  (positional_encoding): PositionalEncoding()
  (encoder_layers): ModuleList(
    (0-3): 4 x EncoderBlock(
      (self_attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=256, out_features=256, bias=True)
      )
      (ffn): Sequential(
        (0): Linear(in_features=256, out_features=1024, bias=True)
        (1): ReLU()
        (2): Linear(in_features=1024, out_features=256, bias=True)
      )
      (norm1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
  )
  (decoder_layers): ModuleList(
    (0-3): 4 x DecoderBlock(
      (self_attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=256, out_features=256, bias=True)
      )
      (cross_attn): MultiheadAttention(
   

In [72]:
@torch.no_grad()
def generate_docstring(code_text, max_len=128, temperature=0.7, top_k=30):
    model.eval()

    prompt = (
        "Generate a clear Python docstring for the following function:\n\n"
        + code_text
    )

    src_ids = tokenizer.encode(prompt).ids[:max_len - 2]
    src_ids = [bos_token_id] + src_ids + [eos_token_id]
    src = torch.tensor(src_ids).unsqueeze(0).to(device)

    generated = [bos_token_id]

    for _ in range(max_len):
        tgt = torch.tensor(generated).unsqueeze(0).to(device)
        tgt_mask = generate_causal_mask(tgt.size(1)).to(device)

        logits = model(src, tgt, tgt_mask=tgt_mask)
        next_token_logits = logits[0, -1] / temperature

        values, indices = torch.topk(next_token_logits, k=top_k)
        probs = torch.softmax(values, dim=-1)
        probs = torch.clamp(probs, min=1e-8)

        next_token_id = indices[torch.multinomial(probs, 1)].item()

        if next_token_id == eos_token_id:
            break

        generated.append(next_token_id)

    return tokenizer.decode(generated[1:], skip_special_tokens=True).strip()


In [78]:
test_code = """
def add(a, b):
    return a + b
"""

print(generate_docstring(test_code))


Return True if the next state has the next one, return the next
        set of all known objects.


In [65]:
ckpt = torch.load("checkpoint.pt")

model.load_state_dict(ckpt["model"])
optimizer.load_state_dict(ckpt["optimizer"])
step = ckpt["step"]

print("Resumed from step:", step)


Resumed from step: 19500


In [60]:
model.train()

batch_size = 8
num_steps = 20000   # new target

src_batch, tgt_batch = [], []

for example in dataset:
    src, tgt = tokenize_pair(example["code"], example["docstring"], max_len)

    src_batch.append(src)
    tgt_batch.append(tgt)

    if len(src_batch) == batch_size:
        src_batch = torch.tensor(pad_batch(src_batch, pad_token_id)).to(device)
        tgt_batch = torch.tensor(pad_batch(tgt_batch, pad_token_id)).to(device)

        tgt_input = tgt_batch[:, :-1]
        tgt_output = tgt_batch[:, 1:]

        tgt_mask = generate_causal_mask(tgt_input.size(1)).to(device)

        logits = model(src_batch, tgt_input, tgt_mask=tgt_mask)

        loss = criterion(
            logits.reshape(-1, vocab_size),
            tgt_output.reshape(-1)
        )

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        if step % 100 == 0:
            print(f"Step {step} | Loss: {loss.item():.4f}")

        if step % 500 == 0:
            torch.save({
                "model": model.state_dict(),
                "optimizer": optimizer.state_dict(),
                "step": step
            }, "checkpoint.pt")
            print(f"ðŸ’¾ Checkpoint saved at step {step}")

        src_batch, tgt_batch = [], []
        step += 1

        if step >= num_steps:
            break


Step 7500 | Loss: 4.3379
ðŸ’¾ Checkpoint saved at step 7500
Step 7600 | Loss: 2.8304
Step 7700 | Loss: 2.3705
Step 7800 | Loss: 5.1436
Step 7900 | Loss: 4.4487
Step 8000 | Loss: 4.1335
ðŸ’¾ Checkpoint saved at step 8000
Step 8100 | Loss: 4.5260
Step 8200 | Loss: 4.6074
Step 8300 | Loss: 4.9820
Step 8400 | Loss: 4.8544
Step 8500 | Loss: 4.6132
ðŸ’¾ Checkpoint saved at step 8500
Step 8600 | Loss: 4.4369
Step 8700 | Loss: 3.1880
Step 8800 | Loss: 4.7258
Step 8900 | Loss: 4.3237
Step 9000 | Loss: 4.6179
ðŸ’¾ Checkpoint saved at step 9000
Step 9100 | Loss: 5.2755
Step 9200 | Loss: 4.2680
Step 9300 | Loss: 4.4683
Step 9400 | Loss: 4.6040
Step 9500 | Loss: 4.5486
ðŸ’¾ Checkpoint saved at step 9500
Step 9600 | Loss: 4.7893
Step 9700 | Loss: 4.3875
Step 9800 | Loss: 4.3004
Step 9900 | Loss: 4.7694
Step 10000 | Loss: 4.8247
ðŸ’¾ Checkpoint saved at step 10000
Step 10100 | Loss: 4.4037
Step 10200 | Loss: 4.6259
Step 10300 | Loss: 4.8515
Step 10400 | Loss: 4.1374
Step 10500 | Loss: 4.6819
ðŸ’¾ Che

In [79]:
@torch.no_grad()
def generate_docstring(code_text, max_len=128, temperature=0.6, top_k=30):
    model.eval()

    # Encode source (code)
    src_ids = tokenizer.encode(code_text).ids[:max_len - 2]
    src_ids = [bos_token_id] + src_ids + [eos_token_id]
    src = torch.tensor(src_ids).unsqueeze(0).to(device)

    generated = [bos_token_id]

    for _ in range(max_len):
        tgt = torch.tensor(generated).unsqueeze(0).to(device)
        tgt_mask = generate_causal_mask(tgt.size(1)).to(device)

        logits = model(src, tgt, tgt_mask=tgt_mask)
        next_logits = logits[0, -1] / temperature

        values, indices = torch.topk(next_logits, k=top_k)
        probs = torch.softmax(values, dim=-1)

        next_token_id = indices[torch.multinomial(probs, 1)].item()
        if next_token_id == eos_token_id:
            break

        generated.append(next_token_id)

    return tokenizer.decode(generated[1:], skip_special_tokens=True).strip()


In [85]:
code_text = """
You are a Python documentation generator.
Explain EXACTLY what this function does.

def add(a, b):
    return a + b
"""

print(generate_docstring(code_text, temperature=0.4, top_k=10))



Returns the next list of the given string, or a list of strings.


In [None]:
!pip install datasets tokenizers torch transformers tqdm

In [None]:
import torch
import datasets
import tokenizers

print('Torch:', torch.__version__)
print('Datasets:', datasets.__version__)
print('Tokenizers:', tokenizers.__version__)

## Load CodeXGLUE Dataset

In [None]:
from datasets import load_dataset

print('Loading CodeXGLUE (code â†’ docstring, Python)...')
dataset = load_dataset('code_x_glue_ct_code_to_text', 'python', split='train', streaming=True)
print('Dataset loaded successfully.')

In [None]:
for ex in dataset:
    print('CODE:', ex['code'][:200])
    print('DOCSTRING:', ex['docstring'][:200])
    break

## Train BPE Tokenizer

In [None]:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import ByteLevel

def text_iterator(dataset, max_samples=30000):
    for i, ex in enumerate(dataset):
        if i >= max_samples:
            break
        yield ex['code']
        yield ex['docstring']

tokenizer = Tokenizer(BPE(unk_token='<unk>'))
tokenizer.pre_tokenizer = ByteLevel()
trainer = BpeTrainer(vocab_size=32000, special_tokens=['<pad>', '<unk>', '<bos>', '<eos>'])
tokenizer.train_from_iterator(text_iterator(dataset), trainer=trainer)
tokenizer.save('code_doc_tokenizer.json')
print('Tokenizer trained and saved.')

## Transformer Model from Scratch

In [None]:
import torch.nn as nn
import math

d_model = 256
num_heads = 8
num_layers = 4
d_ff = 1024
max_len = 512

from tokenizers.decoders import ByteLevel
tokenizer = Tokenizer.from_file('code_doc_tokenizer.json')
tokenizer.decoder = ByteLevel()

vocab_size = tokenizer.get_vocab_size()
pad_token_id = tokenizer.token_to_id('<pad>')
bos_token_id = tokenizer.token_to_id('<bos>')
eos_token_id = tokenizer.token_to_id('<eos>')

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, num_heads, dropout=dropout, batch_first=True)
        self.ffn = nn.Sequential(nn.Linear(d_model, d_ff), nn.ReLU(), nn.Linear(d_ff, d_model))
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, src_mask=None, src_key_padding_mask=None):
        attn_out, _ = self.self_attn(x, x, x, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)
        x = self.norm1(x + self.dropout(attn_out))
        ffn_out = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_out))
        return x

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, num_heads, dropout=dropout, batch_first=True)
        self.cross_attn = nn.MultiheadAttention(d_model, num_heads, dropout=dropout, batch_first=True)
        self.ffn = nn.Sequential(nn.Linear(d_model, d_ff), nn.ReLU(), nn.Linear(d_ff, d_model))
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, enc_out, tgt_mask=None, tgt_key_padding_mask=None, src_key_padding_mask=None):
        attn_out, _ = self.self_attn(x, x, x, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask)
        x = self.norm1(x + self.dropout(attn_out))
        attn_out, _ = self.cross_attn(x, enc_out, enc_out, key_padding_mask=src_key_padding_mask)
        x = self.norm2(x + self.dropout(attn_out))
        ffn_out = self.ffn(x)
        x = self.norm3(x + self.dropout(ffn_out))
        return x

In [None]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_len, pad_token_id):
        super().__init__()
        self.src_embed = nn.Embedding(vocab_size, d_model, padding_idx=pad_token_id)
        self.tgt_embed = nn.Embedding(vocab_size, d_model, padding_idx=pad_token_id)
        self.positional_encoding = PositionalEncoding(d_model, max_len)
        self.encoder_layers = nn.ModuleList([EncoderBlock(d_model, num_heads, d_ff) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderBlock(d_model, num_heads, d_ff) for _ in range(num_layers)])
        self.output_layer = nn.Linear(d_model, vocab_size)
    
    def forward(self, src, tgt, src_key_padding_mask=None, tgt_key_padding_mask=None, tgt_mask=None):
        src = self.positional_encoding(self.src_embed(src))
        tgt = self.positional_encoding(self.tgt_embed(tgt))
        enc_out = src
        for layer in self.encoder_layers:
            enc_out = layer(enc_out, src_key_padding_mask=src_key_padding_mask)
        dec_out = tgt
        for layer in self.decoder_layers:
            dec_out = layer(dec_out, enc_out, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_key_padding_mask, src_key_padding_mask=src_key_padding_mask)
        return self.output_layer(dec_out)

## Training Loop

In [None]:
def generate_causal_mask(size):
    mask = torch.triu(torch.ones(size, size), diagonal=1)
    mask = mask.masked_fill(mask == 1, float('-inf'))
    return mask

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Transformer(vocab_size, d_model, num_heads, num_layers, d_ff, max_len, pad_token_id).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=pad_token_id)
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)

In [None]:
def tokenize_pair(code, docstring, max_len):
    src = tokenizer.encode(code).ids[:max_len - 2]
    tgt = tokenizer.encode(docstring).ids[:max_len - 2]
    src = [bos_token_id] + src + [eos_token_id]
    tgt = [bos_token_id] + tgt + [eos_token_id]
    return src, tgt

def pad_batch(seqs, pad_id):
    max_len = max(len(s) for s in seqs)
    return [s + [pad_id] * (max_len - len(s)) for s in seqs]

In [None]:
model.train()
batch_size = 8
num_steps = 8000
step = 0
src_batch, tgt_batch = [], []

for example in dataset:
    src, tgt = tokenize_pair(example['code'], example['docstring'], max_len)
    src_batch.append(src)
    tgt_batch.append(tgt)
    if len(src_batch) == batch_size:
        src_batch = torch.tensor(pad_batch(src_batch, pad_token_id)).to(device)
        tgt_batch = torch.tensor(pad_batch(tgt_batch, pad_token_id)).to(device)
        tgt_input = tgt_batch[:, :-1]
        tgt_output = tgt_batch[:, 1:]
        tgt_mask = generate_causal_mask(tgt_input.size(1)).to(device)
        logits = model(src_batch, tgt_input, tgt_mask=tgt_mask)
        loss = criterion(logits.reshape(-1, vocab_size), tgt_output.reshape(-1))
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        if step % 100 == 0:
            print(f'Step {step} | Loss: {loss.item():.4f}')
        if step % 500 == 0:
            torch.save({'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'step': step}, 'checkpoint.pt')
            print(f'Checkpoint saved at step {step}')
        step += 1
        src_batch, tgt_batch = [], []
        if step >= num_steps:
            break

## Inference with Top-k Sampling

In [None]:
@torch.no_grad()
def generate_docstring(code_text, max_len=128, temperature=0.7, top_k=30):
    model.eval()
    prompt = 'Generate a clear Python docstring for the following function:\n\n' + code_text
    src_ids = tokenizer.encode(prompt).ids[:max_len - 2]
    src_ids = [bos_token_id] + src_ids + [eos_token_id]
    src = torch.tensor(src_ids).unsqueeze(0).to(device)
    generated = [bos_token_id]
    for _ in range(max_len):
        tgt = torch.tensor(generated).unsqueeze(0).to(device)
        tgt_mask = generate_causal_mask(tgt.size(1)).to(device)
        logits = model(src, tgt, tgt_mask=tgt_mask)
        next_token_logits = logits[0, -1] / temperature
        values, indices = torch.topk(next_token_logits, k=top_k)
        probs = torch.softmax(values, dim=-1)
        next_token_id = indices[torch.multinomial(probs, 1)].item()
        if next_token_id == eos_token_id:
            break
        generated.append(next_token_id)
    return tokenizer.decode(generated[1:], skip_special_tokens=True).strip()

test_code = 'def add(a, b):\n    return a + b'
print(generate_docstring(test_code))