In [25]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Load dataset 
# you can download this dataset from https://huggingface.co/datasets/stepp1/tweet_emotion_intensity/tree/main
data = pd.read_csv('data/tweet_emotion_intensity/train.csv')

#### Synonym replacement code

In [26]:
# Import necessary modules
import random # Random module for generating random numbers and selections
from nltk.corpus import wordnet # NLTK's WordNet corpus for finding synonyms

# Define a function to find and replace a word with a synonym
def synonym_replacement(word):
    # Get all synsets (sets of synonyms) for the given word from WordNet
    synonyms = wordnet.synsets(word)

    # If the word has synonyms, randomly choose one synonym, otherwise return the original word
    if synonyms:
        # Select a random synonym and get the first lemma (word form) of that synonym
        return random.choice(synonyms).lemmas()[0].name()

    # If no synonyms are found, return the original word
    return word

# Define a function to augment text by replacing words with synonyms randomly
def augment_text(text):
    # Split the input text into individual words
    words = text.split() # Split the input text into individual words

    # Replace each word with a synonym with a probability of 20% (random.random() > 0.8)
    augmented_words = [
        synonym_replacement(word) if random.random() > 0.8 else word
        for word in words
    ] # Iterate over each word in the original text

    # Join the augmented words back into a single string and return it
    return ' '.join(augmented_words)

In [27]:
import re # Import the `re` module for working with regular expressions

# Function to clean the text
def clean_text(text):
    text = text.lower() # Convert all text to lowercase for uniformity
    text = re.sub(r'http\S+', '', text) # Remove URLs from the text
    text = re.sub(r'<.*?>', '', text) # Remove any HTML tags from the text
    text = re.sub(r'[^\w\s]', '', text) # Remove punctuation, keep only words and spaces
    return text # Return the cleaned text

In [28]:
# Assume `data` is a pandas DataFrame with a column named 'text'
# Apply the cleaning function to each row of the 'text' column
data['cleaned_text'] = data['tweet'].apply(clean_text)

# Print the first 5 rows of the cleaned text to verify the cleaning process
print(data.head())

# Check for missing values in the dataset
print(data.isnull().sum()) # Print the count of missing values for each column

# Option 1: Remove rows with missing data in the 'cleaned_text' column
#data = data.dropna(subset=['cleaned_text']) # Drop rows where 'cleaned_text' is NaN (missing)

# Option 2: Fill missing values in 'cleaned_text' with a placeholder
data['cleaned_text'] = data['cleaned_text'].fillna('unknown')

      id                                              tweet    class  \
0  40815  Loved @Bethenny independence msg on @WendyWill...     fear   
1  10128  @mark_slifer actually maybe we were supposed t...  sadness   
2  40476  I thought the nausea and headaches had passed ...     fear   
3  20813  Anger, resentment, and hatred are the destroye...    anger   
4  40796  new tires &amp; an alarm system on my car. fwm...     fear   

  sentiment_intensity class_intensity  labels  \
0                 low        fear_low       4   
1                high    sadness_high       9   
2              medium     fear_medium       5   
3                high      anger_high       0   
4                 low        fear_low       4   

                                        cleaned_text  
0  loved bethenny independence msg on wendywillia...  
1  mark_slifer actually maybe we were supposed to...  
2  i thought the nausea and headaches had passed ...  
3  anger resentment and hatred are the destroyer ...

In [29]:
# Apply the text augmentation function to the 'cleaned_text' column in a DataFrame
# Create a new column 'augmented_text' containing the augmented version of 'cleaned_text'
data['augmented_text'] = data['cleaned_text'].apply(augment_text)

In [31]:
from datasets import Dataset
from transformers import AutoTokenizer

# Tokenizer de BERT
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

# Convertimos DataFrame a Dataset de Hugging Face, usando la columna aumentada
hf_dataset = Dataset.from_pandas(data[['augmented_text', 'labels']])

# Renombramos la columna de texto a 'text' para claridad y etiquetas a 'labels' para Trainer
hf_dataset = hf_dataset.rename_column("augmented_text", "text")
#hf_dataset = hf_dataset.rename_column("labels", "labels")

In [33]:
def tokenize_function(examples):
    return tokenizer(
        examples['text'], 
        padding='max_length', 
        truncation=True, 
        max_length=128
    )

tokenized_dataset = hf_dataset.map(tokenize_function, batched=True)


Map: 100%|██████████| 3960/3960 [00:00<00:00, 12912.28 examples/s]


In [36]:
from datasets import ClassLabel

# You need to create a ClassLabel feature specifying the number of classes or possible label names.

# Define ClassLabel feature for the 'labels' column
num_classes = len(data['labels'].unique())

class_label_feature = ClassLabel(num_classes=num_classes)

# Cast the labels column
tokenized_dataset = tokenized_dataset.cast_column("labels", class_label_feature)


Casting the dataset: 100%|██████████| 3960/3960 [00:00<00:00, 532353.97 examples/s]


In [37]:
print(tokenized_dataset.features)

{'text': Value('string'), 'labels': ClassLabel(names=['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11']), 'input_ids': List(Value('int32')), 'token_type_ids': List(Value('int8')), 'attention_mask': List(Value('int8'))}


In [38]:
# Dividimos 80% entrenamiento, 10% validación, 10% test
splits = tokenized_dataset.train_test_split(test_size=0.2, seed=42, stratify_by_column='labels')
val_test_split = splits['test'].train_test_split(test_size=0.5, seed=42, stratify_by_column='labels')

train_dataset = splits['train']
val_dataset = val_test_split['train']
test_dataset = val_test_split['test']

# Ajustamos formato para PyTorch (que use solo las columnas necesarias)
train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
val_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])

## PEFT (Parameter Efficient Fine Tuning)

In [39]:
# Load pre-trained BERT model
from transformers import BertForSequenceClassification

model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=num_classes)

# Step 1: Freeze all layers except the last one (classification head)
for param in model.bert.parameters():
    param.requires_grad = False

# If you'd like to fine-tune additional layers (e.g., the last 2 layers), you can unfreeze those layers as well
for param in model.bert.encoder.layer[-2:].parameters():
    param.requires_grad = True

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
from transformers import TrainingArguments, Trainer
import numpy as np
import evaluate

accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
recall = evaluate.load("recall")
f1 = evaluate.load("f1")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    acc = accuracy.compute(predictions=predictions, references=labels)
    prec = precision.compute(predictions=predictions, references=labels, average='weighted')
    rec = recall.compute(predictions=predictions, references=labels, average='weighted')
    f1_score = f1.compute(predictions=predictions, references=labels, average='weighted')

    return {
        "accuracy": acc["accuracy"],
        "precision": prec["precision"],
        "recall": rec["recall"],
        "f1": f1_score["f1"]
    }

training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    num_train_epochs=3,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    save_total_limit=2,
    logging_dir="./logs",
    logging_steps=50,
    seed=42,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
)

trainer.train()

Downloading builder script: 4.20kB [00:00, 2.47MB/s]


Epoch,Training Loss,Validation Loss,Accuracy
1,2.4067,2.371119,0.169192




In [None]:
# Evaluate the model
val_results = trainer.evaluate()
print(val_results)

test_results = trainer.evaluate(test_dataset)
print(test_results)

## Optimize PEFT for your task
PEFT can be further optimized for specific tasks by experimenting with different sets of parameters or layers to fine-tune. You can also try adjusting the learning rate or batch size to see how they impact the model’s performance.

### Optimization ideas
- Fine-tune additional layers (e.g., the last two to three layers instead of just the final classification head).

- Adjust hyperparameters such as learning rate and number of epochs to find the best configuration for your task.

In [None]:
# Example of adjusting learning rate for PEFT optimization
training_args = TrainingArguments(
    output_dir='./results',
    learning_rate=5e-5,  # Experiment with different learning rates
    num_train_epochs=5,
    per_device_train_batch_size=16,
)

In [None]:
# Use hyperparameter search to optimize fine-tuning
best_model = trainer.hyperparameter_search(
    direction="maximize",
    n_trials=10
)

## LoRA (Low-rank adaptation)

In [None]:
from peft import get_peft_model, LoraConfig, TaskType
from transformers import BertForSequenceClassification, AutoTokenizer

# Load base model
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_classes)

# LoRA configuration
lora_config = LoraConfig(
    task_type=TaskType.SEQ_CLS,  # Sequence classification
    r=8,
    lora_alpha=32,
    lora_dropout=0.1,
    bias="none"
)

# Inject LoRA adapters
model = get_peft_model(model, lora_config)


  from .autonotebook import tqdm as notebook_tqdm


ImportError: DLL load failed while importing lib: No se encontró el proceso especificado.

In [None]:
from transformers import TrainingArguments, Trainer
import numpy as np
import evaluate

accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
recall = evaluate.load("recall")
f1 = evaluate.load("f1")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    acc = accuracy.compute(predictions=predictions, references=labels)
    prec = precision.compute(predictions=predictions, references=labels, average='weighted')
    rec = recall.compute(predictions=predictions, references=labels, average='weighted')
    f1_score = f1.compute(predictions=predictions, references=labels, average='weighted')

    return {
        "accuracy": acc["accuracy"],
        "precision": prec["precision"],
        "recall": rec["recall"],
        "f1": f1_score["f1"]
    }

training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    num_train_epochs=3,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    save_total_limit=2,
    logging_dir="./logs",
    logging_steps=50,
    seed=42,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
)

trainer.train()

ImportError: DLL load failed while importing lib: No se encontró el proceso especificado.

In [None]:
# Evaluate the model
val_results = trainer.evaluate()
print(val_results)

test_results = trainer.evaluate(test_dataset)
print(test_results)

## QLoRA (Quantized Low-rank adaptation)

In [None]:
from transformers import BertForSequenceClassification, AutoTokenizer, BitsAndBytesConfig
from peft import get_peft_model, LoraConfig, TaskType

# Quantization config (4-bit)
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype="float16"
)

# Load quantized base model
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", quantization_config=bnb_config, device_map="auto", num_labels=num_classes)

#for name, module in model.named_modules():
#    if "query" in name or "key" in name or "value" in name:
#        print(name)

target_modules = ["query", "value"]

# Apply LoRA
lora_config = LoraConfig(
    r=8,
    lora_alpha=32,
    target_modules=target_modules,
    lora_dropout=0.05,
    bias="none",
    task_type=TaskType.SEQ_CLS
)

model = get_peft_model(model, lora_config)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


bert.encoder.layer.0.attention.self.query
bert.encoder.layer.0.attention.self.key
bert.encoder.layer.0.attention.self.value
bert.encoder.layer.1.attention.self.query
bert.encoder.layer.1.attention.self.key
bert.encoder.layer.1.attention.self.value
bert.encoder.layer.2.attention.self.query
bert.encoder.layer.2.attention.self.key
bert.encoder.layer.2.attention.self.value
bert.encoder.layer.3.attention.self.query
bert.encoder.layer.3.attention.self.key
bert.encoder.layer.3.attention.self.value
bert.encoder.layer.4.attention.self.query
bert.encoder.layer.4.attention.self.key
bert.encoder.layer.4.attention.self.value
bert.encoder.layer.5.attention.self.query
bert.encoder.layer.5.attention.self.key
bert.encoder.layer.5.attention.self.value
bert.encoder.layer.6.attention.self.query
bert.encoder.layer.6.attention.self.key
bert.encoder.layer.6.attention.self.value
bert.encoder.layer.7.attention.self.query
bert.encoder.layer.7.attention.self.key
bert.encoder.layer.7.attention.self.value
bert.enc

In [None]:
from transformers import TrainingArguments, Trainer
import numpy as np
import evaluate

accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
recall = evaluate.load("recall")
f1 = evaluate.load("f1")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    acc = accuracy.compute(predictions=predictions, references=labels)
    prec = precision.compute(predictions=predictions, references=labels, average='weighted')
    rec = recall.compute(predictions=predictions, references=labels, average='weighted')
    f1_score = f1.compute(predictions=predictions, references=labels, average='weighted')

    return {
        "accuracy": acc["accuracy"],
        "precision": prec["precision"],
        "recall": rec["recall"],
        "f1": f1_score["f1"]
    }

training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    num_train_epochs=3,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    save_total_limit=2,
    logging_dir="./logs",
    logging_steps=50,
    seed=42,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
)

trainer.train()

In [None]:
# Evaluate the model
val_results = trainer.evaluate()
print(val_results)

test_results = trainer.evaluate(test_dataset)
print(test_results)