In [None]:
import torch
import torch.nn as nn
import random

def generate_random_network():
    layers = []
    input_channels = random.randint(1, 3)
    input_size = random.randint(32, 224)
    output_size = random.randint(1, 10)
    num_layers = random.randint(2, 5)

    current_channels = input_channels
    current_size = input_size

    for i in range(num_layers):
        # Randomly choose layer type
        layer_type = random.choice(['linear', 'conv1d', 'conv2d'] if i < num_layers - 1 else ['linear'])

        if layer_type == 'linear':
            if i == 0:
                in_features = current_channels * current_size * current_size
            else:
                in_features = get_output_features(layers[-1], current_channels, current_size)
            out_features = random.randint(10, 200) if i < num_layers - 1 else output_size
            layers.append(nn.Linear(in_features, out_features))
            current_channels = 1
            current_size = out_features
        elif layer_type == 'conv1d':
            out_channels = random.randint(16, 64)
            layers.append(nn.Conv1d(current_channels, out_channels, kernel_size=3, padding=1))
            current_channels = out_channels
        elif layer_type == 'conv2d':
            out_channels = random.randint(16, 64)
            layers.append(nn.Conv2d(current_channels, out_channels, kernel_size=3, padding=1))
            current_channels = out_channels

        # Randomly add normalization
        if random.choice([True, False]):
            if layer_type == 'linear':
                layers.append(nn.BatchNorm1d(out_features))
            elif layer_type == 'conv1d':
                layers.append(nn.BatchNorm1d(out_channels))
            elif layer_type == 'conv2d':
                layers.append(nn.BatchNorm2d(out_channels))

        # Randomly add activation functions
        if random.choice([True, False]):
            layers.append(random.choice([nn.ReLU(), nn.LeakyReLU(), nn.Tanh(), nn.Sigmoid()]))

        # Randomly add dropout
        if random.choice([True, False]):
            layers.append(nn.Dropout(p=random.uniform(0.1, 0.5)))

    # Ensure the last layer is Linear and outputs the correct size
    if not isinstance(layers[-1], nn.Linear) or layers[-1].out_features != output_size:
        in_features = get_output_features(layers[-1], current_channels, current_size)
        layers.append(nn.Linear(in_features, output_size))

    return nn.Sequential(*layers)

def get_output_features(layer, current_channels, current_size):
    if isinstance(layer, nn.Linear):
        return layer.out_features
    elif isinstance(layer, (nn.Conv1d, nn.Conv2d)):
        return layer.out_channels
    elif isinstance(layer, (nn.BatchNorm1d, nn.BatchNorm2d)):
        return layer.num_features
    elif isinstance(layer, (nn.ReLU, nn.LeakyReLU, nn.Tanh, nn.Sigmoid, nn.Dropout)):
        return current_channels * current_size * current_size
    else:
        raise ValueError(f"Unsupported layer type: {type(layer)}")

# Test the function
try:
    model = generate_random_network()
    print(model)
except Exception as e:
    print(f"An error occurred: {e}")

In [None]:
def generate_description(model):
    layers = list(model.children())

    # Determine input shape
    first_layer = layers[0]
    if isinstance(first_layer, nn.Linear):
        input_shape = f"b,{first_layer.in_features}"
    elif isinstance(first_layer, nn.Conv1d):
        input_shape = f"b,{first_layer.in_channels},w"
    elif isinstance(first_layer, nn.Conv2d):
        input_shape = f"b,{first_layer.in_channels},h,w"
    else:
        input_shape = "unknown"

    # Determine output shape
    last_layer = next((layer for layer in reversed(layers) if isinstance(layer, nn.Linear)), None)
    if last_layer:
        output_shape = f"b,{last_layer.out_features}"
    else:
        output_shape = "unknown"

    description = f"This neural network takes an input of shape ({input_shape}) "
    description += f"and produces an output of shape ({output_shape}). "
    description += f"It consists of {len(layers)} layers, including "

    layer_descriptions = []
    for layer in layers:
        if isinstance(layer, nn.Linear):
            layer_descriptions.append(f"a Linear layer ({layer.in_features} -> {layer.out_features})")
        elif isinstance(layer, nn.Conv1d):
            layer_descriptions.append(f"a 1D Convolutional layer ({layer.in_channels} -> {layer.out_channels}, kernel_size={layer.kernel_size[0]})")
        elif isinstance(layer, nn.Conv2d):
            layer_descriptions.append(f"a 2D Convolutional layer ({layer.in_channels} -> {layer.out_channels}, kernel_size={layer.kernel_size})")
        elif isinstance(layer, nn.BatchNorm1d):
            layer_descriptions.append(f"a 1D Batch Normalization (num_features={layer.num_features})")
        elif isinstance(layer, nn.BatchNorm2d):
            layer_descriptions.append(f"a 2D Batch Normalization (num_features={layer.num_features})")
        elif isinstance(layer, nn.ReLU):
            layer_descriptions.append("a ReLU activation")
        elif isinstance(layer, nn.LeakyReLU):
            layer_descriptions.append(f"a Leaky ReLU activation (negative_slope={layer.negative_slope:.2f})")
        elif isinstance(layer, nn.Tanh):
            layer_descriptions.append("a Tanh activation")
        elif isinstance(layer, nn.Sigmoid):
            layer_descriptions.append("a Sigmoid activation")
        elif isinstance(layer, nn.Dropout):
            layer_descriptions.append(f"a Dropout layer (p={layer.p:.2f})")
        else:
            layer_descriptions.append(f"an unknown layer type: {type(layer).__name__}")

    description += ", ".join(layer_descriptions) + "."
    return description

# Example usage:
model = generate_random_network()
description = generate_description(model)
print(description)

In [None]:
list(model.children())

In [None]:
import gc
def generate_dataset(num_samples):
    dataset = []
    for _ in range(num_samples):
        model = generate_random_network()
        description = generate_description(model)
        dataset.append((str(model), description))
        # Explicitly delete variables and run garbage collection
        del model
        del description
        gc.collect()
    return dataset

# Generate samples dataset
# sample_df = generate_dataset(1000)

In [None]:
sample_df[0]

In [None]:
type(sample_df[0])

In [None]:
import pickle

def save_dataset(dataset, filename):
    with open(filename, 'wb') as f:
        pickle.dump(dataset, f)

save_dataset(sample_df, 'neural_network_dataset.pkl')

In [None]:
!ls -al

In [None]:
!pip install transformers

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

class TransformerSeq2Seq(nn.Module):
    def __init__(self, input_dim, output_dim, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1):
        super(TransformerSeq2Seq, self).__init__()
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)
        self.fc_out = nn.Linear(d_model, output_dim)
        self.src_tok_emb = nn.Embedding(input_dim, d_model)
        self.tgt_tok_emb = nn.Embedding(output_dim, d_model)
        self.positional_encoding = nn.Embedding(5000, d_model)

    def forward(self, src, tgt):
        src_seq_len, N = src.shape
        tgt_seq_len, N = tgt.shape

        src_positions = (
            torch.arange(0, src_seq_len).unsqueeze(1).expand(src_seq_len, N).to(src.device)
        )
        tgt_positions = (
            torch.arange(0, tgt_seq_len).unsqueeze(1).expand(tgt_seq_len, N).to(tgt.device)
        )

        embed_src = self.src_tok_emb(src) + self.positional_encoding(src_positions)
        embed_tgt = self.tgt_tok_emb(tgt) + self.positional_encoding(tgt_positions)

        transformer_out = self.transformer(embed_src, embed_tgt)
        out = self.fc_out(transformer_out)

        return out



In [None]:
!pip install transformers

In [None]:
from transformers import BertTokenizer

# Initialize the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Example input and output
example_input = "Sequential((0): Linear(in_features=38809, out_features=26, bias=True) (1): LeakyReLU(negative_slope=0.01) (2): Linear(in_features=676, out_features=183, bias=True) (3): BatchNorm1d(183, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) (4): LeakyReLU(negative_slope=0.01) (5): Conv2d(1, 25, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (6): Dropout(p=0.11890518865721189, inplace=False) (7): Linear(in_features=837225, out_features=8, bias=True))"
example_output = "This neural network takes an input of shape (b,38809) and produces an output of shape (b,8). It consists of 8 layers, including a Linear layer (38809 -> 26), a Leaky ReLU activation (negative_slope=0.01), a Linear layer (676 -> 183), a 1D Batch Normalization (num_features=183), a Leaky ReLU activation (negative_slope=0.01), a 2D Convolutional layer (1 -> 25, kernel_size=(3, 3)), a Dropout layer (p=0.12), a Linear layer (837225 -> 8)."

# Tokenize the input and output
input_tokens = tokenizer.tokenize(example_input)
output_tokens = tokenizer.tokenize(example_output)

print("Input Tokens:", input_tokens)
print("Output Tokens:", output_tokens)


In [None]:
# Convert tokens to input IDs
input_ids = tokenizer.convert_tokens_to_ids(input_tokens)
output_ids = tokenizer.convert_tokens_to_ids(output_tokens)

print("Input IDs:", input_ids)
print("Output IDs:", output_ids)


In [None]:
type(sample_df[0])

In [None]:
import json
with open("./data/data_10k.json", "r") as f:
    data = json.load(f)

len(data)

In [None]:
from transformers import BertTokenizer
import torch
from torch.utils.data import Dataset, DataLoader

class SyntheticDataset(Dataset):
    def __init__(self, data, tokenizer, max_length=512):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_text, output_text = self.data[idx]

        # Tokenize input and output texts
        input_encoding = self.tokenizer(input_text, return_tensors='pt', max_length=self.max_length, truncation=True, padding='max_length')
        output_encoding = self.tokenizer(output_text, return_tensors='pt', max_length=self.max_length, truncation=True, padding='max_length')

        # Extract input and output IDs
        input_ids = input_encoding['input_ids'].squeeze(0)  # Remove batch dimension
        output_ids = output_encoding['input_ids'].squeeze(0)  # Remove batch dimension

        return input_ids, output_ids

# Initialize the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Example data
example_input = "Sequential((0): Linear(in_features=38809, out_features=26, bias=True) (1): LeakyReLU(negative_slope=0.01) (2): Linear(in_features=676, out_features=183, bias=True) (3): BatchNorm1d(183, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) (4): LeakyReLU(negative_slope=0.01) (5): Conv2d(1, 25, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (6): Dropout(p=0.11890518865721189, inplace=False) (7): Linear(in_features=837225, out_features=8, bias=True))"
example_output = "This neural network takes an input of shape (b,38809) and produces an output of shape (b,8). It consists of 8 layers, including a Linear layer (38809 -> 26), a Leaky ReLU activation (negative_slope=0.01), a Linear layer (676 -> 183), a 1D Batch Normalization (num_features=183), a Leaky ReLU activation (negative_slope=0.01), a 2D Convolutional layer (1 -> 25, kernel_size=(3, 3)), a Dropout layer (p=0.12), a Linear layer (837225 -> 8)."

# Create dataset
# data = [(example_input, example_output)]
# data = [(str(sample_df[-1][0]), sample_df[-1][1])]
# data = [(str(sample_df[i][0]), sample_df[i][1]) for i in range(len(sample_df))]
dataset = SyntheticDataset(data, tokenizer)

# Create DataLoader
dataloader = DataLoader(dataset, batch_size=24, shuffle=True)

# Print the first batch to verify
for batch in dataloader:
    src, tgt = batch
    print("Source Shape:", src.shape)
    print("Target Shape:", tgt.shape)
    break


In [None]:
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")  #for mac

model = TransformerSeq2Seq(len(tokenizer.vocab), len(tokenizer.vocab)).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss()

def train(model, dataloader, optimizer, criterion, num_epochs=10):
    model.train()

    for epoch in range(num_epochs):
        epoch_loss = 0

        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)

            optimizer.zero_grad()

            output = model(src, tgt[:-1, :])
            output = output.reshape(-1, output.shape[2])
            tgt = tgt[1:, :].reshape(-1)

            loss = criterion(output, tgt)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        print(f'Epoch {epoch+1}, Loss: {epoch_loss / len(dataloader)}')

# Train the model
# train(model, dataloader, optimizer, criterion, num_epochs=20)


In [None]:
!pip install numpy

In [None]:
import os
# device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")  #for mac
def train_with_checkpointing(model, dataloader, optimizer, criterion, num_epochs=10, patience=3, checkpoint_path='checkpoint.pt'):
    model.train()
    best_loss = float('inf')
    epochs_no_improve = 0

    for epoch in range(num_epochs):
        epoch_loss = 0

        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)

            optimizer.zero_grad()

            output = model(src, tgt[:-1, :])
            output = output.reshape(-1, output.shape[2])
            tgt = tgt[1:, :].reshape(-1)

            loss = criterion(output, tgt)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        avg_epoch_loss = epoch_loss / len(dataloader)
        print(f'Epoch {epoch+1}, Loss: {avg_epoch_loss}')

        # Checkpointing
        if avg_epoch_loss < best_loss:
            best_loss = avg_epoch_loss
            torch.save(model.state_dict(), checkpoint_path)
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if epochs_no_improve == patience:
                print('Early stopping')
                break

# Train the model with checkpointing
train_with_checkpointing(model, dataloader, optimizer, criterion, num_epochs=20, patience=5)


In [None]:
!ls -al

In [None]:
!cp /content/checkpoint.pt /content/drive/MyDrive/ShodhAI/checkpoints

In [None]:
generate_dataset(1)

In [None]:
import numpy as np
def f1_score(y_true, y_pred):
    true_positives = np.sum((y_true == 1) & (y_pred == 1))
    false_positives = np.sum((y_true == 0) & (y_pred == 1))
    false_negatives = np.sum((y_true == 1) & (y_pred == 0))
    
    precision = true_positives / (true_positives + false_positives)
    recall = true_positives / (true_positives + false_negatives)
    
    f1 = 2 * (precision * recall) / (precision + recall)
    return f1

In [None]:
from transformers import BertTokenizer

import torch
from torch.utils.data import DataLoader
import numpy as np

def compute_f1_score(predictions, targets):
    # Tokenize predictions and targets
    pred_tokens = [pred.split() for pred in predictions]
    target_tokens = [tgt.split() for tgt in targets]
    
    # Flatten the lists
    pred_flat = [token for sent in pred_tokens for token in sent]
    target_flat = [token for sent in target_tokens for token in sent]
    
    # Get unique tokens
    unique_tokens = list(set(pred_flat + target_flat))
    
    # Create label encodings
    label_encoder = {token: i for i, token in enumerate(unique_tokens)}
    
    # Encode predictions and targets
    pred_encoded = [label_encoder[token] for token in pred_flat]
    target_encoded = [label_encoder[token] for token in target_flat]
    
    # Compute F1 score
    return f1_score(target_encoded, pred_encoded)

def test_model(model, tokenizer, test_data, batch_size=32):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Prepare DataLoader for testing
    test_dataset = SyntheticDataset(test_data, tokenizer)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size)
    
    # Set model to evaluation mode
    model.eval()  ## It will freeze the model weights
    srcs = []
    predictions = []
    targets = []
    
    with torch.no_grad():
        for src, tgt in test_dataloader:
            src, tgt = src.to(device), tgt.to(device)
            
            # Generate predictions
            output = model(src, tgt[:-1, :])  # Remove last token from target for prediction
            predicted_ids = torch.argmax(output, dim=-1)
            
            # Convert predicted IDs to tokens
            predicted_tokens = tokenizer.batch_decode(predicted_ids, skip_special_tokens=True)
            target_tokens = tokenizer.batch_decode(tgt, skip_special_tokens=True)
            
            # Store predictions and targets
            ##
            src_tokens = tokenizer.batch_decode(src, skip_special_tokens=True)
            srcs.append(src_tokens)
            predictions.extend(predicted_tokens)
            targets.extend(target_tokens)
    
    # Compute F1 score
    # f1 = compute_f1_score(predictions, targets)
    
    return srcs, predictions, targets #, f1

def main():
    # Initialize tokenizer
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    
    test_data = generate_dataset(10) # [(example source, example_traget)]
    
    # Initialize and load your TransformerSeq2Seq model
    model = TransformerSeq2Seq(input_dim=len(tokenizer.vocab), output_dim=len(tokenizer.vocab))
    model.load_state_dict(torch.load('/Users/sourav/Documents/myprojects/Shodh-AI-Assignment/model/checkpoint.pt'))  # Load your trained model checkpoint
    model.to(device)
    
    # Test the model
    predictions, f1_score = test_model(model, tokenizer, test_data)
    
    # Print predictions and F1 score
    for idx, prediction in enumerate(predictions):
        print(f"Example {idx + 1} Prediction:", prediction)
    
    print(f"F1 Score: {f1_score:.4f}")


In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    
test_data = generate_dataset(10) # [(example source, example_traget)]

# Initialize and load your TransformerSeq2Seq model
model = TransformerSeq2Seq(input_dim=len(tokenizer.vocab), output_dim=len(tokenizer.vocab))
model.load_state_dict(torch.load('/Users/sourav/Documents/myprojects/Shodh-AI-Assignment/model/checkpoint_10k.pt', map_location=torch.device('cpu')))  # Loading the trained model checkpoint

In [None]:
device

In [None]:
srcs, predictions, targets = test_model(model, tokenizer, test_data)

In [None]:
srcs

In [None]:
predictions

In [None]:
targets

In [None]:
# Print predictions and F1 score
for idx, prediction in enumerate(predictions):
    print(f"Example {idx + 1} Prediction:", prediction)

print(f"F1 Score: {f1_score:.4f}")

In [None]:
torch.backends.mps.is_available()

In [None]:
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")  #for mac

In [None]:
device

In [None]:
a = torch.randn(2, 3, 4, 5)
a.device

In [None]:
# Tokenize predictions and targets
pred_tokens = [pred.split() for pred in predictions]
target_tokens = [tgt.split() for tgt in targets]

In [None]:
len(pred_tokens)

In [None]:
# Flatten the lists
pred_flat = [token for sent in pred_tokens for token in sent]
target_flat = [token for sent in target_tokens for token in sent]

In [None]:
unique_tokens = list(set(pred_flat + target_flat))
unique_tokens

In [None]:
# Flatten the lists
pred_flat = [token for sent in pred_tokens for token in sent]
target_flat = [token for sent in target_tokens for token in sent]

# Get unique tokens
unique_tokens = list(set(pred_flat + target_flat))

# Create label encodings
label_encoder = {token: i for i, token in enumerate(unique_tokens)}

# Encode predictions and targets
pred_encoded = [label_encoder[token] for token in pred_flat]
target_encoded = [label_encoder[token] for token in target_flat]

# Compute F1 score
return f1_score(target_encoded, pred_encoded)

In [None]:
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

def calculate_metrics(predictions, targets, tokenizer):
    # Flatten the list of lists
    pred_flat = [token for sent in predictions for token in sent]
    target_flat = [token for sent in targets for token in sent]
    
    # Convert tokens to IDs
    pred_ids = tokenizer.convert_tokens_to_ids(pred_flat)
    target_ids = tokenizer.convert_tokens_to_ids(target_flat)
    
    # Ensure we use the same set of unique tokens for both
    unique_tokens = list(set(pred_ids + target_ids))
    
    # Calculate precision, recall, and F1-score
    precision, recall, f1, _ = precision_recall_fscore_support(target_ids, pred_ids, labels=unique_tokens, average='weighted')
    accuracy = accuracy_score(target_ids, pred_ids)
    
    return precision, recall, f1, accuracy

# Example usage:
predictions = [["token1", "token2", "token3"], ["token4", "token5"]]
targets = [["token1", "token2", "token6"], ["token4", "token7"]]

# Initialize tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Calculate metrics
precision, recall, f1, accuracy = calculate_metrics(predictions, targets, tokenizer)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Accuracy: {accuracy:.4f}")
