In [1]:
%reload_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset
from transformers import (
    RobertaTokenizer, 
    RobertaForSequenceClassification, 
    RobertaConfig,
    AutoModelForSequenceClassification,
    AutoTokenizer,
    AutoConfig,
    TrainingArguments, 
    Trainer, 
    EarlyStoppingCallback
)
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import evaluate
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Union

# Set random seed for reproducibility
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Define constants
MAX_LEN = 512
TARGET_COLUMNS = ['humor', 'offensiveness', 'clarity', 'surprise_factor', 
                 'relatability', 'novelty', 'conciseness', 'sentiment']
NUM_TARGETS = len(TARGET_COLUMNS)
MODEL_CHECKPOINT = "distilbert/distilroberta-base" 

In [4]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler, StandardScaler

def load_data(file_path, nrows=None):
    # Load dataset from a Parquet file
    df = pd.read_parquet(file_path)
    if nrows:
        df = df.head(nrows)
    
    # Define the numerical columns:
    # Non-sentiment numerical columns
    non_sentiment_columns = [
        'humor', 'offensiveness', 'clarity', 
        'surprise_factor', 'relatability', 'novelty', 
        'conciseness'
    ]
    # Sentiment column as a separate list
    sentiment_column = ['sentiment']

    # First, apply the MinMaxScaler to the non-sentiment columns with a range of 0 to 100.
    minmax_scaler_non_sent = MinMaxScaler(feature_range=(0, 100))
    df[non_sentiment_columns] = minmax_scaler_non_sent.fit_transform(df[non_sentiment_columns])
    
    # Then, apply the MinMaxScaler to the sentiment column with a range of -100 to 100.
    minmax_scaler_sent = MinMaxScaler(feature_range=(-100, 100))
    df[sentiment_column] = minmax_scaler_sent.fit_transform(df[sentiment_column])
    
    # Combine all numerical columns for further scaling
    numerical_columns = non_sentiment_columns + sentiment_column
    
    # Now, apply StandardScaler to standardize (zero mean, unit variance) the already min-max scaled columns.
    standard_scaler = StandardScaler()
    df[numerical_columns] = standard_scaler.fit_transform(df[numerical_columns])
    
    return df, standard_scaler

# Custom dataset class
class JokeDataset(Dataset):
    def __init__(self, jokes, targets, tokenizer, max_len):
        self.jokes = jokes
        self.targets = targets
        self.tokenizer = tokenizer
        self.max_len = max_len
        
    def __len__(self):
        return len(self.jokes)
    
    def __getitem__(self, idx):
        joke = str(self.jokes[idx])
        targets = self.targets[idx].astype(np.float32)
        
        encoding = self.tokenizer.encode_plus(
            joke,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=True,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            # 'token_type_ids': encoding['token_type_ids'].flatten(),
            'labels': torch.tensor(targets, dtype=torch.float32)
        }

# Define a custom model class for multiple regression
class RobertaForMultipleRegression(RobertaForSequenceClassification):
    def __init__(self, config):
        super().__init__(config)
        
    def forward(self, input_ids=None, attention_mask=None, token_type_ids=None,
                position_ids=None, head_mask=None, inputs_embeds=None, labels=None,
                output_attentions=None, output_hidden_states=None, return_dict=None):
        
        return super().forward(
            input_ids=input_ids,
            attention_mask=attention_mask,
            # token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            labels=labels,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict
        )
    

# Define the compute_metrics function for Trainer
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    
    # Calculate MSE for each target dimension
    mse_per_target = []
    for i in range(NUM_TARGETS):
        mse = mean_squared_error(labels[:, i], predictions[:, i])
        mse_per_target.append(mse)
    
    # Calculate average MSE across all targets
    avg_mse = np.mean(mse_per_target)
    
    # Create result dictionary with individual and average MSE
    results = {"mse": avg_mse}
    for i, target in enumerate(TARGET_COLUMNS):
        results[f"mse_{target}"] = mse_per_target[i]
    
    return results

# Main function
def main(data_path, nrows):
    # Load data
    print(f"Loading data from {data_path}")
    df, scalar = load_data(data_path, nrows)

    
    
    # Split data into train, validation, and test sets (80%, 10%, 10%)
    train_df, temp_df = train_test_split(df, test_size=0.2, random_state=SEED)
    val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=SEED)
    
    print(f"Train set: {len(train_df)} samples")
    print(f"Validation set: {len(val_df)} samples")
    print(f"Test set: {len(test_df)} samples")
    
    # Initialize tokenizer
    tokenizer = AutoTokenizer.from_pretrained(MODEL_CHECKPOINT)
    
    # Create datasets
    train_dataset = JokeDataset(
        jokes=train_df['joke'].values,
        targets=train_df[TARGET_COLUMNS].values,
        tokenizer=tokenizer,
        max_len=MAX_LEN
    )
    
    val_dataset = JokeDataset(
        jokes=val_df['joke'].values,
        targets=val_df[TARGET_COLUMNS].values,
        tokenizer=tokenizer,
        max_len=MAX_LEN
    )
    
    test_dataset = JokeDataset(
        jokes=test_df['joke'].values,
        targets=test_df[TARGET_COLUMNS].values,
        tokenizer=tokenizer,
        max_len=MAX_LEN
    )
    
    # Configure the model for regression task
    config = AutoConfig.from_pretrained(MODEL_CHECKPOINT)
    config.num_labels = NUM_TARGETS
    config.problem_type = "regression"
    
    # Initialize model
    model = AutoModelForSequenceClassification.from_pretrained(
        MODEL_CHECKPOINT,
        config=config
    )
    
    # Set up training arguments
    training_args = TrainingArguments(
        output_dir='./results',
        num_train_epochs=10,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=64,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir='./logs',
        logging_steps=10,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="eval_mse",
        greater_is_better=False,
        save_total_limit=2,
        learning_rate=5.0e-5
        # fp16=True,  # Use mixed precision training if available
    )
    
    # Set up trainer with early stopping
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=5)]
    )
    
    # Train the model
    print("Starting training...")
    trainer.train()
    
    # Evaluate on test set
    print("Evaluating on test set...")
    test_results = trainer.evaluate(test_dataset)
    print("Test results:", test_results)
    
    # Save model
    print("Saving model...")
    trainer.save_model("./joke_regression_model")
    
    # Perform predictions on test set (for further analysis if needed)
    test_predictions = trainer.predict(test_dataset)
    predictions = test_predictions.predictions
    actual_values = test_predictions.label_ids
    
    # Return results
    return {
        "model": model,
        "test_results": test_results,
        "predictions": predictions,
        "actual_values": actual_values
    }

def predict_joke_ratings(joke_text, model_path="./joke_regression_model"):
    """
    Use the trained model to predict ratings for a new joke.
    
    Args:
        joke_text (str): The text of the joke to rate
        model_path (str): Path to the saved model
    
    Returns:
        dict: Predicted ratings for each dimension
    """
    # Load tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained(MODEL_CHECKPOINT)
    config = AutoConfig.from_pretrained(model_path)
    model = RobertaForMultipleRegression.from_pretrained(model_path, config=config)
    model.eval()
    
    # Tokenize input
    encoding = tokenizer.encode_plus(
        joke_text,
        add_special_tokens=True,
        max_length=MAX_LEN,
        return_token_type_ids=False,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        return_tensors='pt'
    )
    
    # Perform prediction
    with torch.no_grad():
        outputs = model(
            input_ids=encoding['input_ids'],
            attention_mask=encoding['attention_mask'],
            # token_type_ids=encoding['token_type_ids']
        )
        predictions = outputs.logits.cpu().numpy()[0]
    
    # Format results
    result = {}
    for i, target in enumerate(TARGET_COLUMNS):
        result[target] = float(predictions[i])
    
    return result


# Function to analyze model performance
def analyze_results(predictions, actual_values):
    """
    Analyze the model's performance on each target dimension.
    
    Args:
        predictions: Model predictions
        actual_values: Ground truth values
        
    Returns:
        dict: Performance metrics
    """
    results = {}
    
    # For each target dimension
    for i, target in enumerate(TARGET_COLUMNS):
        # Calculate MSE
        mse = mean_squared_error(actual_values[:, i], predictions[:, i])
        
        # Calculate correlation
        corr = np.corrcoef(predictions[:, i], actual_values[:, i])[0, 1]
        
        results[target] = {
            'mse': mse,
            'correlation': corr
        }
    
    # Calculate overall metrics
    results['overall'] = {
        'mse': mean_squared_error(actual_values.flatten(), predictions.flatten()),
        'correlation': np.corrcoef(predictions.flatten(), actual_values.flatten())[0, 1]
    }
    
    return results

In [None]:
# Replace with your actual file path
data_path = "../data/labeled_jokes.parquet"
results = main(data_path, nrows=10000)

# Analyze performance
print("\nAnalyzing model performance:")
performance = analyze_results(results["predictions"], results["actual_values"])
for dimension, metrics in performance.items():
    if dimension != 'overall':
        print(f"{dimension}: MSE = {metrics['mse']:.4f}, Correlation = {metrics['correlation']:.4f}")
print(f"Overall: MSE = {performance['overall']['mse']:.4f}, Correlation = {performance['overall']['correlation']:.4f}")

# Example of using the model with a new joke
print("\nExample prediction:")
sample_joke = "Why don't scientists trust atoms? Because they make up everything!"
predictions = predict_joke_ratings(sample_joke)
for dimension, score in predictions.items():
    print(f"{dimension}: {score:.2f}")

Loading data from ../data/labeled_jokes.parquet
Train set: 8000 samples
Validation set: 1000 samples
Test set: 1000 samples


Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at distilbert/distilroberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Starting training...


Epoch,Training Loss,Validation Loss


In [None]:
# Example of using the model with a new joke
print("\nExample prediction:")
sample_joke = "Negga"
predictions = predict_joke_ratings(sample_joke)
for dimension, score in predictions.items():
    print(f"{dimension}: {score:.2f}")