<a href="https://colab.research.google.com/github/Rayyan-Portfolio/Gen_Ai/blob/main/testing_transformer_text_to_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:


import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import math
import time
from sklearn.model_selection import train_test_split
import os

# Set the environment variable to avoid fragmentation
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

# Load the dataset (replace 'spoc-train.tsv' with your actual file path)
data = pd.read_csv('/content/spoc-train.tsv', sep='\t')

# Keep only 'text' and 'code' columns
data = data[['text', 'code']]

# Check for missing values and drop them if any
data = data.dropna()

# Split into train (80%) and temp (20%)
train_data, temp_data = train_test_split(data, test_size=0.2, random_state=42)

# Split temp into validation (10%) and test (10%)
val_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)

print(f"Training samples: {len(train_data)}")
print(f"Validation samples: {len(val_data)}")
print(f"Test samples: {len(test_data)}")

# Simple tokenizer (splits on spaces)
def tokenize(text):
    return text.split()

# Build vocabularies from training data
def build_vocab(data, tokenizer):
    tokens = set()
    for item in data:
        tokens.update(tokenizer(item))
    # Reserve 0 for padding, 1 for <sos>, 2 for <eos>
    vocab = {token: idx for idx, token in enumerate(tokens, start=3)}
    vocab['<pad>'] = 0
    vocab['<sos>'] = 1
    vocab['<eos>'] = 2
    return vocab

# Create vocabularies
pseudocode_vocab = build_vocab(train_data['text'], tokenize)
cpp_vocab = build_vocab(train_data['code'], tokenize)

# Inverse vocabularies for decoding (optional)
inv_cpp_vocab = {idx: token for token, idx in cpp_vocab.items()}

# Create Dataset class
class CodeDataset(Dataset):
    def __init__(self, data, pseudocode_vocab, cpp_vocab, tokenizer):
        self.data = data
        self.pseudocode_vocab = pseudocode_vocab
        self.cpp_vocab = cpp_vocab
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        pseudocode = self.data.iloc[idx]['text']
        cpp = self.data.iloc[idx]['code']
        # Add special tokens
        pseudo_tokens = ['<sos>'] + self.tokenizer(pseudocode) + ['<eos>']
        cpp_tokens = ['<sos>'] + self.tokenizer(cpp) + ['<eos>']
        # Convert to indices
        pseudo_indices = [self.pseudocode_vocab.get(token, 0) for token in pseudo_tokens]
        cpp_indices = [self.cpp_vocab.get(token, 0) for token in cpp_tokens]
        return torch.tensor(pseudo_indices), torch.tensor(cpp_indices)

# Padding function for batches
def collate_fn(batch):
    pseudocode, cpp = zip(*batch)
    pseudocode = torch.nn.utils.rnn.pad_sequence(pseudocode, padding_value=0, batch_first=True)
    cpp = torch.nn.utils.rnn.pad_sequence(cpp, padding_value=0, batch_first=True)
    return pseudocode, cpp

# Create DataLoaders
batch_size = 8  # Reduced batch size due to memory constraints
train_dataset = CodeDataset(train_data, pseudocode_vocab, cpp_vocab, tokenize)
val_dataset = CodeDataset(val_data, pseudocode_vocab, cpp_vocab, tokenize)
test_dataset = CodeDataset(test_data, pseudocode_vocab, cpp_vocab, tokenize)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)


# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # Shape: [1, max_len, d_model]
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]


# Multi-Head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        d_k = Q.size(-1)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        return torch.matmul(attn, V), attn

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        Q = self.W_q(Q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(K).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(V).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        output, attn = self.scaled_dot_product_attention(Q, K, V, mask)
        output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return self.W_o(output)


# Feed-Forward Network
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.linear2(self.dropout(self.relu(self.linear1(x))))


# Encoder Layer
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ff = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        x = self.norm1(x + self.dropout(self.mha(x, x, x, mask)))
        x = self.norm2(x + self.dropout(self.ff(x)))
        return x


# Decoder Layer
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)
        self.ff = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask, tgt_mask):
        x = self.norm1(x + self.dropout(self.mha1(x, x, x, tgt_mask)))
        x = self.norm2(x + self.dropout(self.mha2(x, enc_output, enc_output, src_mask)))
        x = self.norm3(x + self.dropout(self.ff(x)))
        return x


# Transformer Model
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=64, num_heads=2, num_layers=2, d_ff=256, dropout=0.1, max_len=512):
        super().__init__()
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)
        self.d_model = d_model

    def create_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)  # Padding mask
        tgt_seq_len = tgt.size(1)
        nopeak_mask = torch.tril(torch.ones(tgt_seq_len, tgt_seq_len)).bool().to(tgt.device)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(2) & nopeak_mask
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.create_mask(src, tgt)
        src_embedded = self.dropout(self.pos_encoding(self.src_embedding(src) * math.sqrt(self.d_model)))
        tgt_embedded = self.dropout(self.pos_encoding(self.tgt_embedding(tgt) * math.sqrt(self.d_model)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        return self.fc_out(dec_output)

# Initialize the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Transformer(
    src_vocab_size=len(pseudocode_vocab),
    tgt_vocab_size=len(cpp_vocab),
    d_model=64,  # Further reduced hidden units for memory efficiency
    num_heads=2,  # Reduced number of attention heads
    num_layers=2,  # Reduced number of layers
    d_ff=256,  # Reduced feed-forward dimension
    dropout=0.1
).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding
optimizer = optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

# Check if the file exists before attempting to load
if os.path.exists("/content/transformer_seq2seq_testing0.1.pth"):
    model.load_state_dict(torch.load("/content/transformer_seq2seq_testing0.1.pth"))
    model.eval()
    print("Model loaded successfully.")
else:
    print("Error: Model file not found. Please check the file path.")


# Inverse vocabularies for decoding (for C++ code)
inv_code_vocab = {idx: token for token, idx in cpp_vocab.items()}

def generate_code(model, input_text, text_vocab, code_vocab, max_len=100):
    """
    Generate C++ code from pseudocode using the trained Transformer model.

    :param model: The trained Transformer model.
    :param input_text: The input pseudocode as a string.
    :param text_vocab: Vocabulary for pseudocode (text).
    :param code_vocab: Vocabulary for C++ code.
    :param max_len: Maximum length of the generated code.
    :return: The generated C++ code as a string.
    """
    model.eval()  # Set the model to evaluation mode

    # Tokenize the input text (pseudocode)
    input_tokens = ['<sos>'] + input_text.split() + ['<eos>']

    # Convert tokens to indices using the text vocabulary
    input_indices = [text_vocab.get(token, 0) for token in input_tokens]
    input_tensor = torch.tensor(input_indices).unsqueeze(0).to(device)  # Add batch dimension

    # Start decoding the output C++ code
    generated_code = []
    tgt_input = torch.tensor([text_vocab['<sos>']]).unsqueeze(0).to(device)  # Initial target token

    for _ in range(max_len):
        with torch.no_grad():
            # Get the model's prediction for the next token
            output = model(input_tensor, tgt_input)

            # Get the predicted token (with highest probability) for each position
            predicted_token_idx = output.argmax(dim=-1)[:, -1].item()  # Get the last token prediction

            # If we predict the <eos> token, stop generating
            if predicted_token_idx == code_vocab['<eos>']:
                break

            # Add the predicted token to the output sequence
            generated_code.append(inv_code_vocab.get(predicted_token_idx, '<unk>'))

            # Update the target input (append the predicted token for the next step)
            tgt_input = torch.cat([tgt_input, torch.tensor([[predicted_token_idx]]).to(device)], dim=-1)

    # Join the generated tokens into a single string
    generated_code_str = ' '.join(generated_code)
    return generated_code_str

# Example usage:
input_text = "for i from 1 to n do"
generated_code = generate_code(model, input_text, pseudocode_vocab, cpp_vocab, max_len=100)
print("Generated C++ code:", generated_code)


Training samples: 172980
Validation samples: 21622
Test samples: 21623


  model.load_state_dict(torch.load("/content/transformer_seq2seq_testing0.1.pth"))


Model loaded successfully.
Generated C++ code: ((a[1] chrisCounter a[A[i]]++; (is_p[i]) w(n); a[A[i]]++; (is_p[i]) list[j a[A[i]]++; inicio)


In [9]:
input_text = "read s"
generated_code = generate_code(model, input_text, pseudocode_vocab, cpp_vocab, max_len=100)
print("Generated C++ code:", generated_code)

Generated C++ code: ((a[1] resp++; (minmum res)


In [12]:
import gradio as gr

# Define the function that uses the generate_code logic
def generate_cpp_code_from_pseudocode(input_text):
    generated_code = generate_code(model, input_text, pseudocode_vocab, cpp_vocab, max_len=100)
    return generated_code

# Create a Gradio interface
interface = gr.Interface(
    fn=generate_cpp_code_from_pseudocode,          # Function to run
    inputs=gr.Textbox(lines=2, placeholder="Enter pseudocode here..."),  # Input text box for pseudocode
    outputs=gr.Textbox(label="Generated C++ Code"),  # Output text box for generated C++ code
    title="Pseudocode to C++ Code Generator",  # Title of the interface
    description="Enter pseudocode and get the corresponding C++ code generated using the Transformer model.",
    theme="compact"
)

# Launch the interface
interface.launch()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.

Sorry, we can't find the page you are looking for.


Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://405e6fcdd54ee9973a.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [11]:
!pip install gradio

Collecting gradio
  Downloading gradio-5.19.0-py3-none-any.whl.metadata (16 kB)
Collecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl.metadata (9.7 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.8-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.5.0-py3-none-any.whl.metadata (3.0 kB)
Collecting gradio-client==1.7.2 (from gradio)
  Downloading gradio_client-1.7.2-py3-none-any.whl.metadata (7.1 kB)
Collecting markupsafe~=2.0 (from gradio)
  Downloading MarkupSafe-2.1.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.0 kB)
Collecting pydub (from gradio)
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting python-multipart>=0.0.18 (from gradio)
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)
Collecting ruff>=0.9.3 (from gradio)
  Downloading ruff-0.9.7-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.meta