In [2]:
import torch
from transformers import  AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments, DataCollatorForLanguageModeling, BertModel, BertTokenizer
import csv
import itertools
import random
from torch.utils.data import Dataset, DataLoader, random_split
from tqdm.notebook import tqdm
from datasets import Dataset as FineTuneDataset

# use GPU if available  

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
# extracted the real data samples from the csv file.
def read_csv(file):
    real_texts = []
    # Open the input file for reading
    with open(file, mode='r', newline='', encoding="utf8") as infile:
        # Create a CSV reader object to read from the input file
        csv_reader = csv.reader(infile)
        # Get the header row
        header = next(csv_reader)
        # Find the index of the "text" column
        text_column_index = header.index('text')
        # Use islice to read only the first 10000 rows
        for row in itertools.islice(csv_reader, 10000):
            # append the text 
            real_texts.append(row[text_column_index])

    return real_texts

# call function to extract texts
input_file = 'data/filtered_texts.csv'
real_texts = read_csv(input_file)

In [4]:
# specify model name
model_name = "gpt2" 
# Load the gpt-2 tokenizer
gen_tokenizer = AutoTokenizer.from_pretrained(model_name)
# Load the gpt-2 model
gen_model = AutoModelForCausalLM.from_pretrained(model_name)
# set pad token as eos_token
gen_tokenizer.pad_token = gen_tokenizer.eos_token

gen_model.to(device)

GPT2LMHeadModel(
  (transformer): GPT2Model(
    (wte): Embedding(50257, 768)
    (wpe): Embedding(1024, 768)
    (drop): Dropout(p=0.1, inplace=False)
    (h): ModuleList(
      (0-11): 12 x GPT2Block(
        (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (attn): GPT2Attention(
          (c_attn): Conv1D()
          (c_proj): Conv1D()
          (attn_dropout): Dropout(p=0.1, inplace=False)
          (resid_dropout): Dropout(p=0.1, inplace=False)
        )
        (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (mlp): GPT2MLP(
          (c_fc): Conv1D()
          (c_proj): Conv1D()
          (act): NewGELUActivation()
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
    (ln_f): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  )
  (lm_head): Linear(in_features=768, out_features=50257, bias=False)
)

1. Perspectives: 1st person, 3rd person
2. Context: social Media, article, news, scientifc paper, story
3. topic: politics, sport, research
4. Time (present, past, future)
5. Length

In [5]:
# preprocess the models generated output (it struggled a lot with spaces and special characters) 
def preprocess_string(sequence):
    sequence = sequence.replace("\n", " ")
    sequence = sequence.replace("The text on the internet said the following:", "")
    sequence = sequence.replace("\xa0", "")
    sequence = sequence.replace("'.'", "")
    sequence = sequence.replace("\\", "")
    sequence = sequence.replace("'", "")
    sequence = sequence.replace('"', "")
    sequence = sequence.lstrip(" ")
    sequence = sequence.lstrip(".")
    sequence = sequence.replace("  ", " ")
    sequence = sequence.replace("   ", " ")
    
    # Cut out everything after the last dot to avoid 
    sequence = sequence.rsplit('.',1)[0] +'.'

    return sequence

def generator_generate(model):
    # https://help.openai.com/en/articles/4936856-what-are-tokens-and-how-to-count-them
    # According to OpenAi's estimation: 1 token corresponds to ~4 chars in english. The length of our dataset ranges from 118-299. A range from 80-200 tokens seems sufficient
    random_length = random.randint(60, 180)

    prompt = "The text on the internet said the following:"
    input_ids = gen_tokenizer(prompt, return_tensors="pt").input_ids
    # Generate tokens 
    gen_tokens = model.generate(
        input_ids,
        do_sample=True,
        # set temperature high for variability in generations
        temperature=0.8,
        max_length=random_length,
        pad_token_id=gen_tokenizer.eos_token_id
    )
    # decode tokens
    gen_text = gen_tokenizer.batch_decode(gen_tokens)[0]
    # preprocess text
    preprocessed_gen_text = preprocess_string(gen_text)
    return preprocessed_gen_text



In [6]:
# sample text generation
text_test = generator_generate(gen_model)
print(text_test)

The government wants you to remember the facts. The people who fought for freedom, for justice and for equality will not accept any of this. There is no question about it. This is a matter for the people of Kerala. These things should not be tolerated. You cannot expect the Government of Tamil Nadu, which has the right to decide the issue, to accept and uphold any of these things in India. The Government of Kerala is responsible for all things related to the freedom of thought and expression of individuals in the State without any right to be held responsible or to be punished for violating them. The text has been released as a press release provided by the Union Ministry of Information and Broadcasting. The original version of the text can be found here.


In [7]:
# define Dataset including real and fake samples
class FakeRealDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(),  
            'attention_mask': encoding['attention_mask'].squeeze(),  
            'labels': torch.tensor(label, dtype=torch.long)
        }

# Example usage
max_length = 160  # Maximum length of input sequence

# Initialize tokenizer
disc_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    
# Add classification head
class BERTClassifier(torch.nn.Module):
    def __init__(self, bert_model, num_classes):
        super(BERTClassifier, self).__init__()
        self.bert = bert_model
        self.dropout = torch.nn.Dropout(0.2)
        # https://huggingface.co/transformers/v3.3.1/pretrained_models.html, hidden size of BERT base model is 768
        # add prediction head of size 768x2 on model
        self.fc = torch.nn.Linear(768, num_classes)  
        self.relu = torch.nn.ReLU()

        # Output layer for classification
        self.fc = torch.nn.Linear(768, num_classes)  

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        # apply pooling for better regulization 
        pooled_output = outputs.pooler_output
        # add dropout for better regularization
        pooled_output = self.dropout(pooled_output)
        logits = self.fc(pooled_output)
        return logits
    
    def predict(text, bert_model, tokenizer):
        bert_model.eval()
        
        # Tokenize the input text
        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
        input_ids = inputs['input_ids'].to(device)
        attention_mask = inputs['attention_mask'].to(device)

        with torch.no_grad():
            logits = bert_model(input_ids=input_ids, attention_mask=attention_mask)
        
        predicted_class = torch.argmax(logits, dim=1).item()
        return predicted_class
    

def falsely_classified_as_real(predicted, labels, input_ids):
    # real text = 0 
    # fake text = 1
    misclassified_texts = []
    for i in range(len(predicted)):
        if predicted[i] == 0 and labels[i] == 1:
            current_text = disc_tokenizer.decode(input_ids[i], skip_special_tokens=True)
            misclassified_texts.append(current_text)
    return misclassified_texts
    
bert_model = BertModel.from_pretrained('bert-base-uncased')
# Binary classification 
num_classes = 2  
# initialise Discriminator
disc_model = BERTClassifier(bert_model, num_classes).to(device)

# Define loss function and optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(disc_model.parameters(), lr=1e-3)

# # Freeze pre-trained weights to reduce inference time and avoid catastrophic forgetting
for param in bert_model.parameters():
    param.requires_grad = False


In [8]:
# label and combine fake and real texts
def generate_texts(gen_model, num_generated_samples):
    fake_texts = []
    # generate fake texts using generator
    for _ in tqdm(range(0, num_generated_samples)):
        fake_texts.append(generator_generate(gen_model))

    real_texts_random = random.sample(real_texts, num_generated_samples)

    # Assigning labels
    real_labels = [0] * len(real_texts_random)
    fake_labels = [1] * len(fake_texts)


    # Combine texts and labels
    combined_texts = real_texts_random + fake_texts
    combined_labels = real_labels + fake_labels

    combined_data = list(zip(combined_texts, combined_labels))
    # Shuffle the combined data
    random.shuffle(combined_data)

    gen_texts, gen_labels = zip(*combined_data) 
    gen_texts, gen_labels = list(gen_texts), list(gen_labels)

    return gen_texts, gen_labels

In [9]:
# train discriminator to classify texts
def classify_texts(gen_texts, gen_labels, num_epochs, index):
    # Create dataset instance
    dataset = FakeRealDataset(gen_texts, gen_labels, disc_tokenizer, max_length)

    # Define batch size
    batch_size = int(len(gen_texts)/4)

    # Split dataset into training and validation sets
    train_size = int(0.6 * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

    # Create DataLoaders for training and validation sets
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    with open('training_results.txt', 'a') as epoch_file:
        # iterate over epochs
        for epoch in tqdm(range(num_epochs)):
            total_correct = 0
            total_samples = 0
            disc_model.train()
            # iterate over batch
            for batch in train_dataloader:
                optimizer.zero_grad()
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)
                outputs = disc_model(input_ids, attention_mask)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                # Compute predictions and accuracy
                _, predicted = torch.max(outputs, 1)
                total_correct += (predicted == labels).sum().item()
                total_samples += labels.size(0)
            
            print(f'Epoch {epoch + 1}, Loss: {loss.item()}')
            # Calculate accuracy for the epoch
            accuracy = total_correct / total_samples
            print(f'Epoch {epoch + 1}, Accuracy: {accuracy}')
            # save model performance in file
            epoch_file.write(f"Iteration {index + 1}, Epoch {epoch + 1}, Loss: {loss.item()}, Accuracy: {accuracy}\n")


    misclassified_examples = []
    total_correct = 0
    total_samples = 0

    # evaluate Discriminator  
    disc_model.eval()  # Set the model to evaluation mode
    # no weight update required
    with torch.no_grad():
        for batch in test_dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = disc_model(input_ids, attention_mask)

            # Compute predictions and accuracy
            _, predicted = torch.max(outputs, 1)
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)
            # save fake data that was classified as real for the fine tuning process
            misclassified_examples.append(falsely_classified_as_real(predicted, labels, input_ids))

    # Calculate accuracy
    test_accuracy = total_correct / total_samples
    print(f'Test Accuracy: {test_accuracy}')

    # save model performance in file
    with open('test_results.txt', 'a') as test_file:
        test_file.write(f"Iteration {index + 1}, Test Accuracy: {test_accuracy}\n")

    # Combined missclassified examples throughout epochs in list
    new_misclassified_examples = list(itertools.chain(*misclassified_examples))
    print(new_misclassified_examples)

    # save discriminator
    torch.save(disc_model.state_dict(), "./discriminators/bert_discriminator_" +  str(index+1) + ".pth")



    # filter out sequences that contain unnatural characters to prevent bad fine-tuning data
    def filter_text_elements(text_elements):
        # Define the characters to be filtered out
        filter_chars = {'/', '\\', '_', '[', ']'}

        # Filter the list
        filtered_elements = [element for element in text_elements if not any(char in element for char in filter_chars)]
        return filtered_elements

    # call filter function 
    filtered_misclassified_examples = filter_text_elements(new_misclassified_examples)

    # document missclassified examples
    with open('missclassified_examples.txt', 'a') as file:
        for iteration, sublist in enumerate(filtered_misclassified_examples):
            # Convert sublist to a string
            line = ' '.join(map(str, sublist))
            # Create the line with the iteration number
            line_to_write = f"{str(iteration+1)}: {line}"
            # Write the string to the file, followed by a newline character
            file.write(line_to_write + '\n')

   

    return filtered_misclassified_examples

In [10]:
# Function to predict class
def predict(text, model, tokenizer, device):
    model.eval()
    # Tokenize the input text
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=160).to(device)
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    with torch.no_grad():
        logits = model(input_ids=input_ids, attention_mask=attention_mask)

    predicted_class = torch.argmax(logits, dim=1).item()
    return predicted_class

In [11]:
# Tokenize function for fine tune data
def tokenize_function(examples):
    return gen_tokenizer(examples["text"], truncation=True, padding='longest')

# Define training arguments
training_args = TrainingArguments(
    output_dir="./results",
    overwrite_output_dir=True,
    num_train_epochs=3,
    per_device_train_batch_size=4,
    save_steps=10_000,
    save_total_limit=2,
    prediction_loss_only=True,
    weight_decay=0.1
)

# Data collator for language modeling
data_collator = DataCollatorForLanguageModeling(
    tokenizer=gen_tokenizer,
    # For causal language modeling mlm should be set to False
    mlm=False,  
)

# define fine tuning function 
def gen_fine_tune(new_misclassified_examples, index):
    data_dict = {"text": new_misclassified_examples}
    dataset = FineTuneDataset.from_dict(data_dict)
    print(dataset)

    # tokenize dataset
    tokenized_dataset = dataset.map(tokenize_function, batched=True)
    
    # Initialize the Huggingface Trainer
    trainer = Trainer(
        model=gen_model,
        args=training_args,
        data_collator=data_collator,
        train_dataset=tokenized_dataset,
    )   

    # train generator on fine-tuning data 
    trainer.train()

    # save new generator 
    trainer.save_model("./generators/generator_" + str(index+1))

    # load new_generator
    fine_tuned_model = AutoModelForCausalLM.from_pretrained("./generators/generator_" + str(index+1))
    fine_tuned_model.to(device)

    return fine_tuned_model

In [12]:
new_gen_model = gen_model
# iterate through the priorly defined steps
for index in tqdm(range(0, 10)): 
    # generate texts and labels with 800 samples 
    print("Step 1: GENERATOR: Text generation")
    generated_texts, generated_labels = generate_texts(new_gen_model, 50)
    print("Step 2: DISCRIMINATOR: Text classification")
    missclassified_examples = classify_texts(generated_texts, generated_labels, 3, index)
    print("Step 3: Fine Tuning generator")
    new_gen_model = gen_fine_tune(missclassified_examples, index)

  0%|          | 0/10 [00:00<?, ?it/s]

Step 1: GENERATOR: Text generation


  0%|          | 0/50 [00:00<?, ?it/s]

Step 2: DISCRIMINATOR: Text classification


  0%|          | 0/3 [00:00<?, ?it/s]

Epoch 1, Loss: 0.7755780220031738
Epoch 1, Accuracy: 0.36666666666666664
Epoch 2, Loss: 0.8118131756782532
Epoch 2, Accuracy: 0.55
Epoch 3, Loss: 0.9539410471916199
Epoch 3, Accuracy: 0.5666666666666667
Test Accuracy: 0.4
[]


RuntimeError: Parent directory ./discriminators_complex_new does not exist.

In [44]:
model = BertModel.from_pretrained('bert-base-uncased')
# Example usage
num_classes = 2  # fake or real
model = BERTClassifier(bert_model, num_classes).to(device)

model.load_state_dict(torch.load(f"./discriminators/bert_discriminator_{1}.pth"))

<All keys matched successfully>

In [None]:
# Function to predict class
def predict(text, model, tokenizer, device):
    model.eval()
    # Tokenize the input text
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=160).to(device)
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    with torch.no_grad():
        logits = model(input_ids=input_ids, attention_mask=attention_mask)

    predicted_class = torch.argmax(logits, dim=1).item()
    return predicted_class