In [None]:
!pip install tensorflow

In [None]:
import zipfile
import os
import re
import random
import numpy as np
import pandas as pd
from nltk.stem import WordNetLemmatizer
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from torch.utils.data import DataLoader, Dataset
import shutil

from google.colab import drive
drive.mount('/content/drive')

# Unzip the dataset
zip_path = '/content/humor-detection.zip'
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall('/content/data')

# Check for extracted files
for dirname, _, filenames in os.walk('/content/data'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# Load data
train_x = pd.read_pickle("/content/data/X_train.pickle")
train_y = pd.read_pickle("/content/data/y_train.pickle")
test_x = pd.read_pickle("/content/data/X_test.pickle")
test_y = pd.read_pickle("/content/data/y_test.pickle")

# Split data into training and validation sets
def train_val_split(train_x, train_y):
    out_train_x, out_train_y, out_val_x, out_val_y = [], [], [], []
    for i in range(len(train_x)):
        if random.random() < 0.8:
            out_train_x.append(train_x[i])
            out_train_y.append(train_y[i])
        else:
            out_val_x.append(train_x[i])
            out_val_y.append(train_y[i])
    return out_train_x, out_train_y, out_val_x, out_val_y

train_x, train_y, val_x, val_y = train_val_split(train_x, train_y)

In [None]:
# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
import nltk
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to /root/nltk_data...


True

In [None]:
def lemmatize(s):
    wordnet_lemmatizer = WordNetLemmatizer()
    return " ".join([wordnet_lemmatizer.lemmatize(w, 'v') for w in s.split(" ")])

def lower(s):
    return s.lower()

def clean(data):
    for item in data:
        lemmatize(item)
        lower(item)
        re.sub(r'\d+', '', item)  # Remove numbers
    return data

def tokenize(text):
    return tokenizer(text, padding=True, truncation=True, return_tensors="pt")

def process(data):
    cleaned = clean(data)
    return tokenize(cleaned)

train_batch = process(train_x)
test_batch = process(test_x)
val_batch = process(val_x)

In [None]:
# Create a custom dataset class
class HumorDataset(Dataset):
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return {key: torch.tensor(val[idx]) for key, val in self.inputs.items()}, torch.tensor(self.labels[idx])

# Create DataLoader objects
train_dataset = HumorDataset(train_batch, train_y)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

train_dataset = HumorDataset(train_batch, train_y)
val_dataset = HumorDataset(val_batch, val_y)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False)

# Define optimizer and loss function
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
criterion = torch.nn.CrossEntropyLoss()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import precision_score
import torch

# Initialize lists to store metrics
batch_losses = []
epoch_losses = []
val_precisions = []

# Training loop with validation
model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)  # Example optimizer
criterion = torch.nn.CrossEntropyLoss()  # Example loss function
epochs = 10

for epoch in range(epochs):
    model.train()
    total_loss = 0
    print(f"Epoch {epoch + 1}/{epochs}")

    # Training loop
    for batch_idx, batch in enumerate(train_loader):
        # Unpack inputs and labels
        inputs, labels = batch
        if isinstance(inputs, dict):  # Handle tokenized input format
            inputs = {key: val.to(device) for key, val in inputs.items()}
        else:
            inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward pass, backward pass, and optimization step
        optimizer.zero_grad()
        outputs = model(**inputs) if isinstance(inputs, dict) else model(inputs)
        loss = criterion(outputs.logits if hasattr(outputs, "logits") else outputs, labels)
        loss.backward()
        optimizer.step()

        # Store and log batch loss
        batch_losses.append(loss.item())
        total_loss += loss.item()
        if (batch_idx + 1) % 25 == 0:
          print(f"\tBatch {batch_idx + 1}/{len(train_loader)}, Loss: {loss.item():.4f}")

    # Calculate and store epoch loss
    avg_epoch_loss = total_loss / len(train_loader)
    epoch_losses.append(avg_epoch_loss)

    # Validation loop
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        val_loss = 0
        for val_batch in val_loader:
            val_inputs, val_labels = val_batch
            if isinstance(val_inputs, dict):
                val_inputs = {key: val.to(device) for key, val in val_inputs.items()}
            else:
                val_inputs = val_inputs.to(device)
            val_labels = val_labels.to(device)

            val_outputs = model(**val_inputs) if isinstance(val_inputs, dict) else model(val_inputs)
            val_loss += criterion(val_outputs.logits if hasattr(val_outputs, "logits") else val_outputs, val_labels).item()

            preds = torch.argmax(val_outputs.logits if hasattr(val_outputs, "logits") else val_outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(val_labels.cpu().numpy())

    # Calculate and store validation precision
    avg_val_loss = val_loss / len(val_loader)
    precision = precision_score(all_labels, all_preds, average='weighted')
    val_precisions.append(precision)

    print(f"Epoch {epoch + 1} Loss: {avg_epoch_loss:.4f}, Validation Precision: {precision:.4f}")

In [None]:
# Plot batch losses
plt.figure(figsize=(10, 5))
plt.plot(batch_losses, label='Batch Loss')
plt.xlabel('Batch Number')
plt.ylabel('Loss')
plt.title('Batch Loss During Training')
plt.legend()
plt.savefig('batch_loss_plot.png')
plt.show()

# Plot validation precision
plt.figure(figsize=(10, 5))
plt.plot(val_precisions, label='Validation Precision', color='orange')
plt.xlabel('Epoch')
plt.ylabel('Precision')
plt.title('Validation Precision Over Epochs')
plt.legend()
plt.savefig('validation_precision_plot.png')
plt.show()

In [None]:
# Save the model
output_dir = "/teamspace/studios/this_studio/humor_model"
os.makedirs(output_dir, exist_ok=True)
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)

('/teamspace/studios/this_studio/humor_model/tokenizer_config.json',
 '/teamspace/studios/this_studio/humor_model/special_tokens_map.json',
 '/teamspace/studios/this_studio/humor_model/vocab.txt',
 '/teamspace/studios/this_studio/humor_model/added_tokens.json',
 '/teamspace/studios/this_studio/humor_model/tokenizer.json')

In [None]:
# Zip and download the model
directory_to_zip = "humor_model"
output_zip_file = "humor_model.zip"
shutil.make_archive(output_zip_file.replace(".zip", ""), 'zip', directory_to_zip)
print(f"Zipped {directory_to_zip} into {output_zip_file}")


Zipped humor_model into humor_model.zip
