In [None]:
#!/usr/bin/env python
# coding=utf-8
"""An optimized and streamlined implementation for sentiment analysis on the SST-5 dataset using DeBERTa variants.

This code uses Hugging Face transformers and TensorFlow to train and evaluate DeBERTa models for sentiment classification.
It includes data loading, preprocessing, model definitions, training, and evaluation with improved error handling.

Author: Jiacheng Zheng
Contact: karcenzheng@yeah.net
Date: 2025/03/03
Version: 1.2
"""

import tensorflow as tf
from tensorflow.keras import layers, Model
import numpy as np
import matplotlib.pyplot as plt
import json
import os
from collections import defaultdict
import random
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, classification_report
)
from transformers import (
    AutoTokenizer,
    TFAutoModelForSequenceClassification,
    TFDebertaV2Model
)

# Set random seeds for reproducibility
SEED = 42
tf.random.set_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

# Configure matplotlib
plt.rcParams['font.family'] = 'Garamond'

# =====================
# Data Loading Module
# =====================
class DataLoader:
    @staticmethod
    def load_sst5_data(data_dir="./SST5/"):
        if not os.path.exists(data_dir):
            os.makedirs(data_dir)
            print(f"Data directory created: {data_dir}")

        file_paths = {
            'train': os.path.join(data_dir, 'train.jsonl'),
            'dev': os.path.join(data_dir, 'dev.jsonl'),
            'test': os.path.join(data_dir, 'test.jsonl')
        }

        for name, path in file_paths.items():
            if not os.path.exists(path):
                raise FileNotFoundError(f"File {path} not found. Please ensure SST-5 dataset files are in {data_dir}")

        datasets = {}
        for split in ['train', 'dev', 'test']:
            texts, labels = [], []
            with open(file_paths[split], 'r', encoding='utf-8') as f:
                for line in f:
                    data = json.loads(line)
                    texts.append(data['text'])
                    labels.append(data['label'])
            datasets[split] = (texts, labels)

        train_texts, train_labels = datasets['train']
        val_texts, val_labels = datasets['dev']
        test_texts, test_labels = datasets['test']

        print(f"Dataset sizes - Train: {len(train_texts)}, Val: {len(val_texts)}, Test: {len(test_texts)}")
        return train_texts, train_labels, val_texts, val_labels, test_texts, test_labels

# =====================
# Data Preprocessing Module
# =====================
class DataProcessor:
    def __init__(self, tokenizer, max_length=128):
        self.tokenizer = tokenizer
        self.max_length = max_length

    def create_tf_dataset(self, texts, labels, batch_size=16, shuffle=True):
        encodings = self.tokenizer(
            texts,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='tf'
        )

        dataset = tf.data.Dataset.from_tensor_slices((
            {
                'input_ids': encodings['input_ids'],
                'attention_mask': encodings['attention_mask']
            },
            tf.convert_to_tensor(labels, dtype=tf.int32)
        ))

        if shuffle:
            dataset = dataset.shuffle(buffer_size=len(texts), seed=SEED)
        dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
        return dataset

# =====================
# Model Definition Module
# =====================
class SentimentModel(Model):
    def __init__(self, num_labels=5):
        super().__init__()
        self.num_labels = num_labels

    def prepare_input(self, inputs):
        return inputs['input_ids'], inputs['attention_mask']

class TFDeBERTaForSentiment(Model):
    def __init__(self, model_name="microsoft/deberta-v3-base", num_labels=5):
        super().__init__()
        self.deberta = TFAutoModelForSequenceClassification.from_pretrained(
            model_name,
            num_labels=num_labels,
            from_pt=True
        )
        self.num_labels = num_labels

    def call(self, inputs, training=False):
        outputs = self.deberta(
            input_ids=inputs['input_ids'],
            attention_mask=inputs['attention_mask'],
            training=training
        )
        return outputs.logits  # Return logits directly, apply softmax in loss function

class TFDeBERTaLSTMModel(SentimentModel):
    def __init__(self, model_name="microsoft/deberta-v3-base", num_labels=5, dropout_rate=0.1):
        super().__init__(num_labels)
        self.deberta = TFDebertaV2Model.from_pretrained(model_name)
        self.lstm = layers.LSTM(768, return_sequences=False)
        self.dropout = layers.Dropout(dropout_rate)
        self.classifier = layers.Dense(num_labels)

    def call(self, inputs, training=False):
        input_ids, attention_mask = self.prepare_input(inputs)
        outputs = self.deberta(input_ids=input_ids, attention_mask=attention_mask, training=training)
        sequence_output = outputs[0]
        lstm_output = self.lstm(sequence_output)
        dropout_output = self.dropout(lstm_output, training=training)
        return self.classifier(dropout_output)

class TFDeBERTaFFNModel(SentimentModel):
    def __init__(self, model_name="microsoft/deberta-v3-base", num_labels=5, dropout_rate=0.1):
        super().__init__(num_labels)
        self.deberta = TFDebertaV2Model.from_pretrained(model_name)
        self.dropout = layers.Dropout(dropout_rate)
        self.dense1 = layers.Dense(512, activation='relu')
        self.dense2 = layers.Dense(256, activation='relu')
        self.classifier = layers.Dense(num_labels)

    def call(self, inputs, training=False):
        input_ids, attention_mask = self.prepare_input(inputs)
        outputs = self.deberta(input_ids=input_ids, attention_mask=attention_mask, training=training)
        pooled_output = outputs[0][:, 0, :]  # CLS token
        x = self.dropout(pooled_output, training=training)
        x = self.dense1(x)
        x = self.dropout(x, training=training)
        x = self.dense2(x)
        x = self.dropout(x, training=training)
        return self.classifier(x)

class TFDeBERTaAdapterModel(SentimentModel):
    def __init__(self, model_name="microsoft/deberta-v3-base", num_labels=5, dropout_rate=0.1, bottleneck_dim=64):
        super().__init__(num_labels)
        self.deberta = TFDebertaV2Model.from_pretrained(model_name)
        self.hidden_size = self.deberta.config.hidden_size
        self.adapter_down = layers.Dense(bottleneck_dim, activation='relu')
        self.adapter_up = layers.Dense(self.hidden_size)
        self.dropout = layers.Dropout(dropout_rate)
        self.classifier = layers.Dense(num_labels)

    def call(self, inputs, training=False):
        input_ids, attention_mask = self.prepare_input(inputs)
        outputs = self.deberta(input_ids=input_ids, attention_mask=attention_mask, training=training)
        pooled_output = outputs[0][:, 0, :]
        adapter_down = self.adapter_down(pooled_output)
        adapter_up = self.adapter_up(adapter_down)
        adapter_output = adapter_up + pooled_output
        adapter_output = self.dropout(adapter_output, training=training)
        return self.classifier(adapter_output)

# =====================
# Training and Evaluation Module
# =====================
class ModelTrainer:
    def __init__(self, model, learning_rate=2e-5, weight_decay=0.01):
        self.model = model
        self.optimizer = tf.keras.optimizers.AdamW(learning_rate=learning_rate, weight_decay=weight_decay)
        self.loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        self.metrics = [tf.keras.metrics.SparseCategoricalAccuracy(name='accuracy')]

    def compile_model(self):
        self.model.compile(optimizer=self.optimizer, loss=self.loss_fn, metrics=self.metrics)

    def train(self, train_dataset, val_dataset, epochs=3):
        callbacks = [
            tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=2, restore_best_weights=True),
            tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=1)
        ]
        self.compile_model()
        self.model.summary()
        return self.model.fit(
            train_dataset,
            validation_data=val_dataset,
            epochs=epochs,
            callbacks=callbacks,
            verbose=1
        )

    def evaluate(self, test_dataset):
        self.compile_model()
        test_results = self.model.evaluate(test_dataset, verbose=1, return_dict=True)
        
        y_true, y_pred = [], []
        for batch_inputs, batch_labels in test_dataset:
            preds = self.model(batch_inputs, training=False)
            y_true.extend(batch_labels.numpy().tolist())
            y_pred.extend(np.argmax(preds, axis=1).tolist())

        report = classification_report(y_true, y_pred, target_names=[f"Class {i}" for i in range(5)], output_dict=True)
        return {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision_macro': precision_score(y_true, y_pred, average='macro'),
            'recall_macro': recall_score(y_true, y_pred, average='macro'),
            'f1_macro': f1_score(y_true, y_pred, average='macro'),
            'classification_report': report
        }

# =====================
# Main Execution Module
# =====================
def main():
    os.makedirs("./models/", exist_ok=True)

    print("Starting SST-5 sentiment analysis with DeBERTa...")
    
    # Load data
    print("\nLoading dataset...")
    train_texts, train_labels, val_texts, val_labels, test_texts, test_labels = DataLoader.load_sst5_data()

    # Initialize tokenizer and processor
    model_name = "microsoft/deberta-v3-base"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    data_processor = DataProcessor(tokenizer, max_length=128)
    batch_size = 16

    # Create datasets
    print("\nCreating TensorFlow datasets...")
    train_dataset = data_processor.create_tf_dataset(train_texts, train_labels, batch_size=batch_size)
    val_dataset = data_processor.create_tf_dataset(val_texts, val_labels, batch_size=batch_size, shuffle=False)
    test_dataset = data_processor.create_tf_dataset(test_texts, test_labels, batch_size=batch_size, shuffle=False)

    # GPU configuration
    gpus = tf.config.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
            print(f"\n{len(gpus)} GPU(s) configured")
        except RuntimeError as e:
            print(f"\nGPU config failed: {e}")
    else:
        print("\nNo GPUs detected. Running on CPU.")

    # Define models
    models = {
        "DeBERTaForSentiment": TFDeBERTaForSentiment(model_name=model_name),
        "DeBERTaLSTM": TFDeBERTaLSTMModel(model_name=model_name),
        "DeBERTaFFN": TFDeBERTaFFNModel(model_name=model_name),
        "DeBERTaAdapter": TFDeBERTaAdapterModel(model_name=model_name)
    }

    # Train and evaluate
    models_results = {}
    for model_name, model in models.items():
        print(f"\n{'='*50}\nTraining {model_name} model...\n{'='*50}")
        
        if hasattr(model, 'deberta') and not isinstance(model, TFDeBERTaForSentiment):
            model.deberta.trainable = False
            print(f"Base DeBERTa frozen for {model_name}")

        trainer = ModelTrainer(model)
        try:
            history = trainer.train(train_dataset, val_dataset, epochs=3)
            print(f"\nEvaluating {model_name} model...")
            results = trainer.evaluate(test_dataset)
            models_results[model_name] = results
            model.save_weights(f"./models/{model_name}_weights.h5")
            print(f"{model_name} model weights saved")
        except Exception as e:
            print(f"Error training {model_name}: {str(e)}")
            print("Skipping to next model")
            continue

    # Print results
    if models_results:
        print("\nModel Performance Summary:")
        for name, results in models_results.items():
            print(f"\n{name}:")
            print(f"  Accuracy: {results['accuracy']:.4f}")
            print(f"  Precision: {results['precision_macro']:.4f}")
            print(f"  Recall: {results['recall_macro']:.4f}")
            print(f"  F1 Score: {results['f1_macro']:.4f}")

    print("\nAll done!")

if __name__ == "__main__":
    # Use mixed precision for faster training
    policy = tf.keras.mixed_precision.Policy('mixed_float16')
    tf.keras.mixed_precision.set_global_policy(policy)
    main()

NameError: name 'history' is not defined

In [22]:
import transformers
print(transformers.__version__)  # 应输出4.x.x

4.49.0
