<h1 align="center">Deep Learning Project</h1>
<h4 align="center">Dr. Fatemizadeh</h4>
<h4 align="center">Sharif University of Technology, Fall 2023</h4>
<h4 align="center">Amir Hossein Yari - Mohammad Taslimi - Mahdi Heidari</h4>
<h4 align="center">99102507 - 99101321 - 99100369</h4>

In [None]:
# Import Required Packages
import torch.nn as nn
import torch
import random
import numpy as np
from transformers import AutoModel, AutoTokenizer
import json
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from transformers import AutoConfig
import time
import torch.optim as optim
import torch.nn.functional as F
from google.colab import drive

### Question 3

In [None]:
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:
#------------------------------
#   Generator 1
#------------------------------
class Generator1(nn.Module):
    def __init__(self, noise_size=100, output_size=768, hidden_size=768, dropout_rate=0.1):
        super(Generator1, self).__init__()

        # Build layers sequentially
        layers = []

        # Linear transformation from noise to hidden_size
        layers.append(nn.Linear(noise_size, hidden_size))
        # Leaky ReLU activation
        layers.append(nn.LeakyReLU(0.2, inplace=True))
        # Dropout layer
        layers.append(nn.Dropout(dropout_rate))

        # Output layer
        layers.append(nn.Linear(hidden_size, output_size))

        # Define the sequential model
        self.layers = nn.Sequential(*layers)

    def forward(self, noise):
        # Forward pass through the layers
        output = self.layers(noise)
        return output

In [None]:
#------------------------------
#   Generator 2
#------------------------------
class Generator2(nn.Module):
    def __init__(self, bert_model, output_size=768, noise_size=100):
        super(Generator2, self).__init__()
        self.bert_model = bert_model
        self.output_size = output_size
        self.noise_size = noise_size

    def forward(self, bag_of_words):
        # Random noise
        noise = torch.randn((bag_of_words.size(0), self.noise_size))

        # Concatenate Bag of Words and Noise
        input_noise = torch.cat((bag_of_words, noise), dim=1)

        # BERT-based feature extraction
        bert_output = self.bert_model(input_noise)

        # Extract the pooled output (CLS token) from BERT
        pooled_output = bert_output.pooler_output

        # Linear transformation to the desired output size
        output = nn.Linear(pooled_output.size(1), self.output_size)(pooled_output)

        return output

In [None]:
#------------------------------
#   The Discriminator
#------------------------------
class Discriminator(nn.Module):
    def __init__(self, input_size=768, hidden_size=768, num_labels=6, dropout_rate=0.1):
        super(Discriminator, self).__init__()

        # Input dropout
        self.input_dropout = nn.Dropout(p=dropout_rate)

        # Feature extraction layer
        self.feature_layer = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(dropout_rate)
        )

        # Output layer for classification
        self.logit = nn.Linear(hidden_size, num_labels + 1) # +1 for the probability of this sample being fake/real.
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, input):
        # Apply input dropout
        input = self.input_dropout(input)

        # Forward pass through feature extraction layers
        feature = self.feature_layer(input)

        # Output layer
        logits = self.logit(feature)
        probs = self.softmax(logits)

        return feature, logits, probs

In [None]:
##Set random values
seed_val = 42
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
if torch.cuda.is_available():
  torch.cuda.manual_seed_all(seed_val)

In [None]:
# Check if CUDA (GPU) is available, and assign the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Pre-trained BERT model name
model_name = "bert-base-cased"

# Load the pre-trained BERT model
transformer = AutoModel.from_pretrained(model_name)

# Load the corresponding tokenizer for BERT
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
def parse_question_classification_file(input_file):
    examples = []

    with open(input_file, 'r') as f:
        # Iterate through each line in the JSONL file
        for line in f:
            # Parse the JSON from each line
            data = json.loads(line)

            # Extract relevant information
            text = data.get('text', '')
            label = data.get('label', '')
            model = data.get('model', '')

            # Append the tuple (text, label, model) to examples list
            examples.append((text, label, model))

    return examples

In [None]:
def generate_data_loader(examples, label_map, max_seq_length=64, batch_size=64, do_shuffle=False):
    input_ids = []
    input_mask_array = []
    label_id_array = []

    # Tokenization
    for (text, label, model) in examples:
        # Assuming tokenizer is predefined
        encoded_sent = tokenizer.encode(text, add_special_tokens=True, max_length=max_seq_length, padding="max_length", truncation=True)
        input_ids.append(encoded_sent)
        label_id_array.append(label_map.get(model))

    # Attention mask
    input_mask_array = [[int(token_id > 0) for token_id in sent] for sent in input_ids]

    # Convert to tensors
    input_ids = torch.tensor(input_ids)
    input_mask_array = torch.tensor(input_mask_array)
    label_id_array = torch.tensor(label_id_array)

    # Building the TensorDataset
    dataset = TensorDataset(input_ids, input_mask_array, label_id_array)

    # Choose sampler based on shuffle option
    sampler = RandomSampler if do_shuffle else SequentialSampler

    # Building the DataLoader
    return DataLoader(
        dataset,
        sampler=sampler(dataset),
        batch_size=batch_size
    )

# Load the examples
train_examples = parse_question_classification_file("/content/drive/MyDrive/Dataset/subtaskB_train.jsonl")
test_examples = parse_question_classification_file("/content/drive/MyDrive/Dataset/subtaskB_dev.jsonl")

# Assuming label_map is predefined
label_map = {"human": 0, "chatGPT": 1, "cohere": 2, "davinci": 3, "bloomz": 4, "dolly": 5}

# Generate DataLoaders for training and testing
train_dataloader = generate_data_loader(train_examples, label_map, do_shuffle=True)
test_dataloader = generate_data_loader(test_examples, label_map, do_shuffle=False)

In [None]:
# Instantiate the Generator and Discriminator
generator1 = Generator1()
discriminator = Discriminator()

# Put everything on the GPU if available
generator1.to(device)
discriminator.to(device)
transformer.to(device)

multi_gpu = False

# Use DataParallel if multi_gpu is True
if multi_gpu and torch.cuda.is_available():
    generator1 = torch.nn.DataParallel(generator1)
    discriminator = torch.nn.DataParallel(discriminator)

In [None]:
# Set Hyperparameters
learning_rate_discriminator = 5e-5
learning_rate_generator = 5e-5
epsilon = 1e-8
num_train_epochs = 10
print_each_n_step = 100

# Extract model parameters for Discriminator, Generator, and Transformer
transformer_vars = [i for i in transformer.parameters()]
d_vars = transformer_vars + [v for v in discriminator.parameters()]
g_vars = [v for v in generator1.parameters()]

# Set up optimizers for Discriminator and Generator
dis_optimizer = torch.optim.AdamW(d_vars, lr=learning_rate_discriminator)
gen_optimizer = torch.optim.AdamW(g_vars, lr=learning_rate_generator)

In [None]:
# Training loop
for epoch_i in range(num_train_epochs):
    print(f"\n======== Epoch {epoch_i + 1} / {num_train_epochs} ========")

    tr_g_loss = 0
    tr_d_loss = 0

    # Set models to training mode
    transformer.train()
    generator1.train()
    discriminator.train()

    # Iterate through batches in the training dataloader
    for step, batch in enumerate(train_dataloader):
        # Display progress every print_each_n_step batches
        if step % print_each_n_step == 0 and not step == 0:
            print(f"  Batch {step:>5,}  of  {len(train_dataloader):>5,}.")

        # Move batch tensors to device
        b_input_ids, b_input_mask, b_labels = [tensor.to(device) for tensor in batch]
        real_batch_size = b_input_ids.shape[0]

        # Forward pass through the transformer
        model_outputs = transformer(b_input_ids, attention_mask=b_input_mask)
        hidden_states = model_outputs[-1]

        # Generate fake data using the generator
        noise = torch.zeros(real_batch_size, 100, device=device).uniform_(0, 1)
        gen_rep = generator1(noise)

        # Concatenate real and fake data for the discriminator input
        discriminator_input = torch.cat([hidden_states, gen_rep], dim=0)
        features, logits, probs = discriminator(discriminator_input)

        # Split features, logits, and probs for real and fake data
        features_list = torch.split(features, real_batch_size)
        D_real_features, D_fake_features = features_list[0], features_list[1]

        logits_list = torch.split(logits, real_batch_size)
        D_real_logits, D_fake_logits = logits_list[0], logits_list[1]

        probs_list = torch.split(probs, real_batch_size)
        D_real_probs, D_fake_probs = probs_list[0], probs_list[1]

        # Generator's loss calculation
        g_loss_d = -1 * torch.mean(torch.log(1 - D_fake_probs[:, -1] + epsilon))
        g_feat_reg = torch.mean(torch.pow(torch.mean(D_real_features, dim=0) - torch.mean(D_fake_features, dim=0), 2))
        g_loss = g_loss_d + g_feat_reg

        # Discriminator's loss calculation
        logits = D_real_logits[:, 0:-1]
        log_probs = F.log_softmax(logits, dim=-1)
        label2one_hot = torch.nn.functional.one_hot(b_labels, len(label_map))
        per_example_loss = -torch.sum(label2one_hot * log_probs, dim=-1)
        labeled_example_count = per_example_loss.type(torch.float32).numel()

        D_L_Supervised = 0 if labeled_example_count == 0 else torch.div(torch.sum(per_example_loss.to(device)), labeled_example_count)
        D_L_unsupervised1U = -1 * torch.mean(torch.log(1 - D_real_probs[:, -1] + epsilon))
        D_L_unsupervised2U = -1 * torch.mean(torch.log(D_fake_probs[:, -1] + epsilon))
        d_loss = D_L_Supervised + D_L_unsupervised1U + D_L_unsupervised2U

        # Backward pass and optimization
        gen_optimizer.zero_grad()
        dis_optimizer.zero_grad()
        g_loss.backward(retain_graph=True)
        d_loss.backward()
        gen_optimizer.step()
        dis_optimizer.step()

        # Update loss accumulators
        tr_g_loss += g_loss.item()
        tr_d_loss += d_loss.item()

    # Calculate average training losses
    avg_train_loss_g = tr_g_loss / len(train_dataloader)
    avg_train_loss_d = tr_d_loss / len(train_dataloader)

    # Print average training losses for the epoch
    print("\n  Average training loss generator: {:.3f}".format(avg_train_loss_g))
    print("  Average training loss discriminator: {:.3f}".format(avg_train_loss_d))


  Batch   100  of  1,110.
  Batch   200  of  1,110.
  Batch   300  of  1,110.
  Batch   400  of  1,110.
  Batch   500  of  1,110.
  Batch   600  of  1,110.
  Batch   700  of  1,110.
  Batch   800  of  1,110.
  Batch   900  of  1,110.
  Batch 1,000  of  1,110.
  Batch 1,100  of  1,110.

  Average training loss generator: 0.702
  Average training loss discriminator: 1.625

  Batch   100  of  1,110.
  Batch   200  of  1,110.
  Batch   300  of  1,110.
  Batch   400  of  1,110.
  Batch   500  of  1,110.
  Batch   600  of  1,110.
  Batch   700  of  1,110.
  Batch   800  of  1,110.
  Batch   900  of  1,110.
  Batch 1,000  of  1,110.
  Batch 1,100  of  1,110.

  Average training loss generator: 0.700
  Average training loss discriminator: 1.201

  Batch   100  of  1,110.
  Batch   200  of  1,110.
  Batch   300  of  1,110.
  Batch   400  of  1,110.
  Batch   500  of  1,110.
  Batch   600  of  1,110.
  Batch   700  of  1,110.
  Batch   800  of  1,110.
  Batch   900  of  1,110.
  Batch 1,000  of

KeyboardInterrupt: 

In [None]:
# Set models to evaluation mode
transformer.eval()
discriminator.eval()
generator1.eval()

# Initialize variables for test evaluation
total_test_accuracy = 0
total_test_loss = 0
nb_test_steps = 0

all_preds = []
all_labels_ids = []

# Define the loss function for evaluation
nll_loss = torch.nn.CrossEntropyLoss(ignore_index=-1)

# Iterate through the test dataloader
for batch in test_dataloader:
    # Move batch tensors to the device (GPU or CPU)
    b_input_ids, b_input_mask, b_labels = [tensor.to(device) for tensor in batch]

    # Perform forward pass without gradient computation
    with torch.no_grad():
        model_outputs = transformer(b_input_ids, attention_mask=b_input_mask)
        hidden_states = model_outputs[-1]
        _, logits, probs = discriminator(hidden_states)

        # Extract logits for labeled classes (excluding the fake class)
        filtered_logits = logits[:, 0:-1]
        total_test_loss += nll_loss(filtered_logits, b_labels)

    # Calculate accuracy and accumulate predictions and labels
    _, preds = torch.max(filtered_logits, 1)
    all_preds += preds.detach().cpu()
    all_labels_ids += b_labels.detach().cpu()

# Convert accumulated predictions and labels to NumPy arrays
all_preds = torch.stack(all_preds).numpy()
all_labels_ids = torch.stack(all_labels_ids).numpy()

# Calculate accuracy and average test loss
test_accuracy = np.sum(all_preds == all_labels_ids) / len(all_preds)
avg_test_loss = total_test_loss / len(test_dataloader)
avg_test_loss = avg_test_loss.item()

# Print test evaluation results
print("\n  Accuracy on Test Set: {:.3f}".format(test_accuracy))
print("  Average Test Loss: {:.3f}".format(avg_test_loss))


  Accuracy on Test Set: 0.527
  Average Test Loss: 2.043
