In [1]:
import pandas as pd
from torch.utils.data import Dataset
import re
import math
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from tqdm import tqdm

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
class SPOCDataset(Dataset):
    def __init__(self, tsv_path, tokenizer_src, tokenizer_tgt, max_len=256):
        self.data = pd.read_csv(tsv_path, sep='\t', quoting=3)
        self.tokenizer_src = tokenizer_src
        self.tokenizer_tgt = tokenizer_tgt
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Assume 'text' column holds pseudocode and 'code' column holds C++ code.
        src_text = self.data.iloc[idx]['code']
        tgt_text = self.data.iloc[idx]['text']
        src_tokens = self.tokenizer_src(str(src_text))[:self.max_len]
        tgt_tokens = self.tokenizer_tgt(str(tgt_text))[:self.max_len]
        return {'src': src_tokens, 'tgt': tgt_tokens}


In [4]:
def simple_tokenizer(text):
    # A simple whitespace and punctuation based tokenizer
    tokens = re.findall(r"[\w]+|[^\s\w]", text)
    return tokens

def build_vocab(tokenized_texts, min_freq=1):
    vocab = {}
    for tokens in tokenized_texts:
        for token in tokens:
            vocab[token] = vocab.get(token, 0) + 1
    # Filter tokens by frequency
    vocab = {token: freq for token, freq in vocab.items() if freq >= min_freq}

    # Reserve indices for special tokens and assign them first.
    specials = ['<pad>', '<unk>', '<sos>', '<eos>']
    vocab_with_specials = {}
    for idx, sp in enumerate(specials):
        vocab_with_specials[sp] = idx
    # Now assign remaining tokens starting after the specials.
    current_idx = len(specials)
    for token in vocab:
        if token not in vocab_with_specials:
            vocab_with_specials[token] = current_idx
            current_idx += 1
    return vocab_with_specials

In [5]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # Shape: (1, max_len, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x shape: (batch_size, seq_len, d_model)
        seq_len = x.size(1)
        x = x + self.pe[:, :seq_len]
        return x


In [6]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert embed_size % num_heads == 0, "Embedding size must be divisible by number of heads"
        self.num_heads = num_heads
        self.head_dim = embed_size // num_heads

        # Learnable linear layers for queries, keys, and values.
        self.fc_q = nn.Linear(embed_size, embed_size)
        self.fc_k = nn.Linear(embed_size, embed_size)
        self.fc_v = nn.Linear(embed_size, embed_size)
        self.fc_out = nn.Linear(embed_size, embed_size)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)

        # Linear projections
        Q = self.fc_q(query)  # (B, seq_len, embed_size)
        K = self.fc_k(key)
        V = self.fc_v(value)

        # Reshape for multiple heads: (B, num_heads, seq_len, head_dim)
        Q = Q.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1,2)
        K = K.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1,2)
        V = V.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1,2)

        # Scaled Dot-Product Attention
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attention = torch.softmax(scores, dim=-1)
        out = torch.matmul(attention, V)  # (B, num_heads, seq_len, head_dim)

        # Concat heads and pass through final linear layer.
        out = out.transpose(1,2).contiguous().view(batch_size, -1, self.num_heads * self.head_dim)
        out = self.fc_out(out)
        return out


In [7]:
class EncoderLayer(nn.Module):
    def __init__(self, embed_size, num_heads, forward_expansion, dropout):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(embed_size, num_heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, forward_expansion * embed_size),
            nn.ReLU(),
            nn.Linear(forward_expansion * embed_size, embed_size)
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attention = self.attention(x, x, x, mask)
        x = self.norm1(x + self.dropout(attention))
        forward = self.feed_forward(x)
        out = self.norm2(x + self.dropout(forward))
        return out

class DecoderLayer(nn.Module):
    def __init__(self, embed_size, num_heads, forward_expansion, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(embed_size, num_heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.encoder_attention = MultiHeadAttention(embed_size, num_heads)
        self.norm2 = nn.LayerNorm(embed_size)
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, forward_expansion * embed_size),
            nn.ReLU(),
            nn.Linear(forward_expansion * embed_size, embed_size)
        )
        self.norm3 = nn.LayerNorm(embed_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, src_mask, tgt_mask):
        # Self-attention with masking (for causal decoding)
        self_attn = self.self_attention(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(self_attn))
        # Encoder-decoder attention
        enc_attn = self.encoder_attention(x, enc_out, enc_out, src_mask)
        x = self.norm2(x + self.dropout(enc_attn))
        forward = self.feed_forward(x)
        out = self.norm3(x + self.dropout(forward))
        return out


In [8]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, embed_size=512, num_layers=6,
                 num_heads=8, forward_expansion=4, dropout=0.1, max_len=100):
        super(Transformer, self).__init__()
        self.embed_size = embed_size

        # Embedding layers for source and target languages.
        self.src_embedding = nn.Embedding(src_vocab_size, embed_size)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, embed_size)
        self.positional_encoding = PositionalEncoding(embed_size, max_len)

        self.encoder_layers = nn.ModuleList(
            [EncoderLayer(embed_size, num_heads, forward_expansion, dropout) for _ in range(num_layers)]
        )
        self.decoder_layers = nn.ModuleList(
            [DecoderLayer(embed_size, num_heads, forward_expansion, dropout) for _ in range(num_layers)]
        )

        self.fc_out = nn.Linear(embed_size, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def make_src_mask(self, src):
        # src: (B, src_len)
        # Create mask to ignore padding tokens (assume padding index is 0)
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        return src_mask

    def make_tgt_mask(self, tgt):
        # tgt: (B, tgt_len)
        B, T = tgt.shape
        tgt_mask = torch.tril(torch.ones((T, T), device=tgt.device)).expand(B, 1, T, T)
        return tgt_mask

    def encode(self, src, src_mask):
        x = self.dropout(self.positional_encoding(self.src_embedding(src)))
        for layer in self.encoder_layers:
            x = layer(x, src_mask)
        return x

    def decode(self, tgt, enc_out, src_mask, tgt_mask):
        x = self.dropout(self.positional_encoding(self.tgt_embedding(tgt)))
        for layer in self.decoder_layers:
            x = layer(x, enc_out, src_mask, tgt_mask)
        return x

    def forward(self, src, tgt):
        src_mask = self.make_src_mask(src)
        tgt_mask = self.make_tgt_mask(tgt)
        enc_out = self.encode(src, src_mask)
        dec_out = self.decode(tgt, enc_out, src_mask, tgt_mask)
        out = self.fc_out(dec_out)
        return out

In [9]:
def prepare_batch(batch, src_vocab, tgt_vocab):
    """
    Prepares a batch by converting token lists into padded tensors.

    Args:
        batch (list): A list of dictionaries with keys 'src' and 'tgt'.
        src_vocab (dict): Source vocabulary mapping tokens to indices.
        tgt_vocab (dict): Target vocabulary mapping tokens to indices.

    Returns:
        src_tensor (torch.Tensor): Padded tensor of source sequences.
        tgt_tensor (torch.Tensor): Padded tensor of target sequences.
    """
    # Extract token lists for source and target sequences
    src_batch = [sample['src'] for sample in batch]
    tgt_batch = [sample['tgt'] for sample in batch]

    # Convert tokens to indices using the provided vocabularies.
    # Use '<unk>' if a token is not found.
    src_indices = [
        [src_vocab.get(token, src_vocab.get('<unk>')) for token in tokens]
        for tokens in src_batch
    ]
    tgt_indices = [
        [tgt_vocab.get(token, tgt_vocab.get('<unk>')) for token in tokens]
        for tokens in tgt_batch
    ]

    # Determine maximum sequence lengths in the batch
    max_src_len = max(len(seq) for seq in src_indices)
    max_tgt_len = max(len(seq) for seq in tgt_indices)

    # Get pad token indices; default to 0 if not explicitly defined
    src_pad_idx = src_vocab.get('<pad>', 0)
    tgt_pad_idx = tgt_vocab.get('<pad>', 0)

    # Pad each sequence to the maximum length in the batch
    padded_src = [
        seq + [src_pad_idx] * (max_src_len - len(seq)) for seq in src_indices
    ]
    padded_tgt = [
        seq + [tgt_pad_idx] * (max_tgt_len - len(seq)) for seq in tgt_indices
    ]

    # Convert the padded lists to PyTorch tensors
    src_tensor = torch.tensor(padded_src, dtype=torch.long)
    tgt_tensor = torch.tensor(padded_tgt, dtype=torch.long)

    return src_tensor, tgt_tensor


In [10]:
# Load the training data
train_data = pd.read_csv('/content/drive/MyDrive/spoc/train/spoc-train.tsv', sep='\t', quoting=3)

# Convert the 'text' and 'code' columns to strings and tokenize them
src_texts = train_data['text'].astype(str).tolist()
tgt_texts = train_data['code'].astype(str).tolist()

# Tokenize the texts using your simple_tokenizer
tokenized_src_texts = [simple_tokenizer(text) for text in src_texts]
tokenized_tgt_texts = [simple_tokenizer(code) for code in tgt_texts]

# Build vocabularies using your build_vocab function
src_vocab = build_vocab(tokenized_src_texts)
tgt_vocab = build_vocab(tokenized_tgt_texts)

# Optionally, print out vocabulary sizes
print(f"Source Vocabulary Size: {len(src_vocab)}")
print(f"Target Vocabulary Size: {len(tgt_vocab)}")


Source Vocabulary Size: 7427
Target Vocabulary Size: 6153


In [12]:
import pickle
with open('/content/drive/MyDrive/C++_to_Pseudo/src_vocab.pkl', 'wb') as f:
    pickle.dump(src_vocab, f)

with open('/content/drive/MyDrive/C++_to_Pseudo/tgt_vocab.pkl', 'wb') as f:
    pickle.dump(tgt_vocab, f)

print("Vocabularies saved to disk.")

Vocabularies saved to disk.


In [13]:
SRC_VOCAB_SIZE = 7427   # update based on built vocabulary
TGT_VOCAB_SIZE = 6153   # update based on built vocabulary
EMBED_SIZE = 128
NUM_LAYERS = 2
NUM_HEADS = 2
FORWARD_EXPANSION = 4
DROPOUT = 0.1
NUM_EPOCHS = 3
LEARNING_RATE = 3e-4
BATCH_SIZE = 32

# Instantiate model
model = Transformer(SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, EMBED_SIZE,
                    num_layers=NUM_LAYERS, num_heads=NUM_HEADS,
                    forward_expansion=FORWARD_EXPANSION, dropout=DROPOUT)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = torch.nn.CrossEntropyLoss(ignore_index=0)  # assuming <pad> token index is 0

def train():
    # Load and tokenize training data.
    train_dataset = SPOCDataset('/content/drive/MyDrive/spoc/train/spoc-train.tsv', simple_tokenizer, simple_tokenizer)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=lambda x: x)

    model.train()
    for epoch in range(NUM_EPOCHS):
        epoch_loss = 0
        # Wrap train_loader with tqdm to show progress
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS}", leave=False)
        for batch in progress_bar:
            # Convert token lists to tensors and pad sequences appropriately.
            src_tensor, tgt_tensor = prepare_batch(batch, src_vocab, tgt_vocab)
            optimizer.zero_grad()
            # Shift target tokens for teacher forcing.
            output = model(src_tensor, tgt_tensor[:, :-1])
            # Reshape output and target for loss computation.
            output = output.reshape(-1, TGT_VOCAB_SIZE)
            tgt_out = tgt_tensor[:, 1:].reshape(-1)
            loss = criterion(output, tgt_out)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            progress_bar.set_postfix(loss=loss.item())
        print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Loss: {epoch_loss/len(train_loader):.4f}")

if __name__ == "__main__":
    train()




Epoch 1/3, Loss: 1.5044




Epoch 2/3, Loss: 0.8707


                                                                          

Epoch 3/3, Loss: 0.7220




In [16]:
torch.save(model.state_dict(), '/content/drive/MyDrive/C++_to_Pseudo/model.pth')

In [None]:
def evaluate(model, dataloader):
    model.eval()
    total_loss = 0
    # Optionally, calculate BLEU scores or use a C++ compiler to check syntactic correctness.
    with torch.no_grad():
        for batch in dataloader:
            src_tensor, tgt_tensor = prepare_batch(batch,src_vocab,tgt_vocab)  # same helper as in training
            output = model(src_tensor, tgt_tensor[:, :-1])
            # Compute loss and any additional metrics here.
    # Return or print evaluation metrics.

if __name__ == "__main__":
    # Load your trained model and evaluation dataset.
    pass


In [15]:
import torch

def generate_code(model, pseudocode, src_vocab, tgt_vocab, max_length=100, device='cpu'):
    """
    Generates C++ code from input pseudocode using a trained Transformer model.

    Args:
        model: The trained Transformer model.
        pseudocode (str): The input pseudocode string.
        src_vocab (dict): Source vocabulary mapping tokens to indices.
        tgt_vocab (dict): Target vocabulary mapping tokens to indices.
        max_length (int): Maximum number of tokens to generate.
        device (str): Device to run the model on ('cpu' or 'cuda').

    Returns:
        generated_code (str): The generated C++ code.
    """
    # Ensure the model is on the correct device.
    model.to(device)
    model.eval()

    with torch.no_grad():
        # Tokenize the pseudocode and convert tokens to indices using the source vocabulary.
        src_tokens = simple_tokenizer(str(pseudocode))
        src_indices = [src_vocab.get(token, src_vocab.get('<unk>')) for token in src_tokens]

        # Convert to tensor with shape [1, src_seq_len] and send to device.
        src_tensor = torch.tensor(src_indices, dtype=torch.long).unsqueeze(0).to(device)
        src_mask = model.make_src_mask(src_tensor)

        # Pass through the encoder.
        encoder_output = model.encode(src_tensor, src_mask)

        # Initialize target sequence with the start-of-sequence token.
        sos_token = tgt_vocab.get('<sos>')
        eos_token = tgt_vocab.get('<eos>')
        generated_tokens = [sos_token]

        # Greedy decoding loop.
        for _ in range(max_length):
            # Create target tensor from generated tokens so far.
            tgt_tensor = torch.tensor(generated_tokens, dtype=torch.long).unsqueeze(0).to(device)
            tgt_mask = model.make_tgt_mask(tgt_tensor)

            # Decode using encoder output and current target sequence.
            decoder_output = model.decode(tgt_tensor, encoder_output, src_mask, tgt_mask)

            # Pass through the final linear layer to get logits over the target vocabulary.
            logits = model.fc_out(decoder_output)  # shape: [1, seq_len, tgt_vocab_size]
            # Focus on the logits of the last token.
            next_token_logits = logits[:, -1, :]  # shape: [1, tgt_vocab_size]
            # Greedy decoding: select the token with the highest logit.
            next_token = torch.argmax(next_token_logits, dim=-1).item()

            generated_tokens.append(next_token)

            # Stop if the end-of-sequence token is generated.
            if next_token == eos_token:
                break

        # Build a reverse mapping from indices to tokens for the target vocabulary.
        rev_tgt_vocab = {idx: token for token, idx in tgt_vocab.items()}
        generated_token_list = [rev_tgt_vocab.get(idx, '<unk>') for idx in generated_tokens]

        # Remove the start token and tokens after the end token.
        if generated_token_list[0] == '<sos>':
            generated_token_list = generated_token_list[1:]
        if '<eos>' in generated_token_list:
            eos_index = generated_token_list.index('<eos>')
            generated_token_list = generated_token_list[:eos_index]

        # Join tokens into a string (adjust spacing/formatting as needed).
        generated_code = ' '.join(generated_token_list)

    return generated_code


code = "int a=2;int b=3;"
code_output = generate_code(model, code, src_vocab, tgt_vocab, max_length=100, device='cuda')
print(code_output)


= integer <unk> a = 2 , b = 3 as integer = 3 ; integer = 3 to 3 <unk> integer <unk> integer a = 2 to 3 as integer <unk> integer <unk> integer = 2 = 2 to 3 = 3 as integer <unk> integer <unk> integer a = 3 = 3 = 3 = 3 = 3 = 3 = 3 as integer <unk> integer <unk> integer <unk> integer <unk> integer array b = 2 to 3 to 3 = 2 to 3 as integer <unk> integer <unk> integer = 2 to 3 = 2 to 3


In [None]:
def load_model(model_path, src_vocab_size, tgt_vocab_size, embed_size=128, num_layers=2,
               num_heads=2, forward_expansion=4, dropout=0.1, max_len=100, device='cpu'):
    """
    Loads the Transformer model from a saved state dictionary.
    """
    model = Transformer(src_vocab_size, tgt_vocab_size, embed_size, num_layers,
                        num_heads, forward_expansion, dropout, max_len)
    state_dict = torch.load(model_path, map_location=device)
    model.load_state_dict(state_dict)
    model.to(device)
    model.eval()
    return model

def load_model_and_generate(model_path, pseudocode, src_vocab, tgt_vocab, device='cuda'):
    """
    Loads the model from the specified path and generates C++ code from pseudocode.
    """
    # Determine vocabulary sizes.
    src_vocab_size = len(src_vocab)
    tgt_vocab_size = len(tgt_vocab)
    # Load the model.
    model = load_model(model_path, src_vocab_size, tgt_vocab_size, device=device)
    # Generate code.
    generated_code = generate_code(model, pseudocode, src_vocab, tgt_vocab, max_length=100, device=device)
    return generated_code

In [None]:
model_path = '/content/drive/MyDrive/Pesudo_to_C++/model'
pseudocode = "n , nn, ans = integers with ans = 0"

# Generate code using the loaded model.
code_output = load_model_and_generate(model_path, pseudocode, src_vocab, tgt_vocab, device='cuda')
print(code_output)

  state_dict = torch.load(model_path, map_location=device)


= 0 , nn ; ans = 0 ; ans = 0 ; + + ) { ans = 0 ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; } ; }
