In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import T5ForConditionalGeneration, T5Tokenizer
from transformers import AdamW
from tqdm.notebook import tqdm
import warnings

warnings.filterwarnings("ignore")

# Check if CUDA is available and set the device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

Using device: cuda


In [2]:
# Read the CSV file
df = pd.read_csv("normalization_assesment_dataset_10k.csv")
print("Dataset shape: ", df.shape)
df.head()

Dataset shape:  (10000, 2)


Unnamed: 0,raw_comp_writers_text,CLEAN_TEXT
0,Jordan Riley/Adam Argyle/Martin Brammer,Jordan Riley/Adam Argyle/Martin Brammer
1,Martin Hygård,Martin Hygård
2,Jesse Robinson/Greg Phillips/Kishaun Bailey/Ka...,Jesse Robinson/Greg Phillips/Kishaun Bailey/Ka...
3,Mendel Brikman,
4,Alvin Lee,Alvin Lee


In [3]:
def handle_na(df):
    # Calculate null values for each column
    null_counts = df.isnull().sum()
    # Calculate percentage of null values
    null_percentages = (null_counts / len(df)) * 100
    print("null_percentages", null_percentages)
    if all(null_percentages[column] for column in df.columns) and df.shape[0] >= 10000:
        new_df = df.dropna()
        print(new_df.shape)
        return new_df
    return df


df = handle_na(df)
df.head()

null_percentages raw_comp_writers_text     0.01
CLEAN_TEXT               13.41
dtype: float64
(8659, 2)


Unnamed: 0,raw_comp_writers_text,CLEAN_TEXT
0,Jordan Riley/Adam Argyle/Martin Brammer,Jordan Riley/Adam Argyle/Martin Brammer
1,Martin Hygård,Martin Hygård
2,Jesse Robinson/Greg Phillips/Kishaun Bailey/Ka...,Jesse Robinson/Greg Phillips/Kishaun Bailey/Ka...
4,Alvin Lee,Alvin Lee
5,Haddag Samir/MusicAlligator,Haddag Samir


In [4]:
# Split the data
X_train, X_test, Y_train, Y_test = train_test_split(
    df["raw_comp_writers_text"].values,
    df["CLEAN_TEXT"].values,
    test_size=0.2,
    random_state=42,
)

print(f"\nTraining samples: {len(X_train)}")
print(f"Validation samples: {len(X_test)}")


Training samples: 6927
Validation samples: 1732


In [5]:
class TextNormalizationDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        """
        Initialize the dataset for text normalization

        Args:
            texts (list): List of raw input texts
            labels (list): List of normalized (clean) texts
            tokenizer: T5 tokenizer instance
            max_length (int): Maximum sequence length
        """
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        # Prepare input text with a task-specific prefix
        input_text = f"normalize: {self.texts[idx]}"

        # Tokenize input and target texts
        input_encoding = self.tokenizer(
            input_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        target_encoding = self.tokenizer(
            self.labels[idx],
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        return {
            "input_ids": input_encoding["input_ids"].flatten(),
            "attention_mask": input_encoding["attention_mask"].flatten(),
            "labels": target_encoding["input_ids"].flatten(),
        }

In [6]:
# Initialize tokenizer
print("\nInitializing tokenizer...")
tokenizer = T5Tokenizer.from_pretrained("t5-small")  # legacy= False

# Create datasets
train_dataset = TextNormalizationDataset(X_train, Y_train, tokenizer,max_length=128)
val_dataset = TextNormalizationDataset(X_test, Y_test, tokenizer,max_length=128)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


Initializing tokenizer...


You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


In [7]:
max_tokens = max(len(tokenizer.encode(text)) for text in df['raw_comp_writers_text'])
print(f"Maximum tokens in dataset: {max_tokens}")

Maximum tokens in dataset: 171


In [8]:
class TextNormalizer:
    def __init__(
        self,
        model_name="t5-small",
        device=device,
    ):
        self.device = device
        print(f"Loading {model_name} model...")
        self.model = T5ForConditionalGeneration.from_pretrained(model_name).to(device)
        self.tokenizer = T5Tokenizer.from_pretrained(model_name)
        print("Model and tokenizer loaded successfully")

    def train(self, train_loader, val_loader, epochs=3, lr=3e-5, patience=3):
        """
        Train the model with early stopping
        
        Args:
            train_loader: Training data loader
            val_loader: Validation data loader
            epochs (int): Maximum number of training epochs
            lr (float): Learning rate
            patience (int): Number of epochs to wait for improvement before stopping
        """
        optimizer = AdamW(self.model.parameters(), lr=lr)
        best_val_loss = float("inf")
        patience_counter = 0
        best_model_state = None

        for epoch in range(epochs):
            print(f"\nEpoch {epoch + 1}/{epochs}")

            # Training loop
            self.model.train()
            train_loss = 0
            progress_bar = tqdm(train_loader, desc="Training")

            for batch in progress_bar:
                input_ids = batch["input_ids"].to(self.device)
                attention_mask = batch["attention_mask"].to(self.device)
                labels = batch["labels"].to(self.device)

                outputs = self.model(
                    input_ids=input_ids, 
                    attention_mask=attention_mask, 
                    labels=labels
                )

                loss = outputs.loss
                train_loss += loss.item()

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                progress_bar.set_postfix({"loss": loss.item()})

            # Validation loop
            val_loss = self.evaluate(val_loader)

            # Print epoch statistics
            avg_train_loss = train_loss / len(train_loader)
            print(f"Average training loss: {avg_train_loss:.4f}")
            print(f"Average validation loss: {val_loss:.4f}")

            # Early stopping logic
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                best_model_state = self.model.state_dict().copy()
                torch.save(best_model_state, "best_model2.pt")
                print("Saved best model checkpoint")
            else:
                patience_counter += 1
                print(f"Validation loss didn't improve. Patience: {patience_counter}/{patience}")

            # Check if we should stop training
            if patience_counter >= patience:
                print(f"\nEarly stopping triggered after epoch {epoch + 1}")
                print(f"Best validation loss: {best_val_loss:.4f}")
                # Restore best model
                self.model.load_state_dict(best_model_state)
                break

    def evaluate(self, val_loader):
        """
        Evaluate the model on validation data
        """
        self.model.eval()
        val_loss = 0

        with torch.no_grad():
            for batch in tqdm(val_loader, desc="Validation"):
                input_ids = batch["input_ids"].to(self.device)
                attention_mask = batch["attention_mask"].to(self.device)
                labels = batch["labels"].to(self.device)

                outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)

                val_loss += outputs.loss.item()

        return val_loss / len(val_loader)

    def normalize_text(self, text):
        """
        Normalize a single text input
        """
        self.model.eval()

        # Prepare input
        input_text = f"normalize: {text}"
        inputs = self.tokenizer(
            input_text,
            return_tensors="pt",
            max_length=128,
            padding=True,
            truncation=True,
        ).to(self.device)

        # Generate output
        with torch.no_grad():
            outputs = self.model.generate(
                input_ids=inputs["input_ids"],
                attention_mask=inputs["attention_mask"],
                max_length=128,
                num_beams=4,
                early_stopping=True,
            )

        return self.tokenizer.decode(outputs[0], skip_special_tokens=True)

In [None]:
# Initialize the model
normalizer = TextNormalizer()

In [None]:
normalizer.train(train_loader, val_loader, epochs=30,patience=2,lr=3e-5)

In [None]:
# Test the model with some examples
test_examples = [
    "Mike Hoyer/JERRY CHESNUT/SONY/ATV MUSIC PUBLISHING (UK) LIMITED",
    "<Unknown>/Wright, Justyce Kaseem",
    "Pixouu/Abdou Gambetta/Copyright Control",
]

print("\nTesting the model with examples:")
for text in test_examples:
    normalized = normalizer.normalize_text(text)
    print(f"\nInput: {text}")
    print(f"Output: {normalized}")

In [None]:
def calculate_accuracy(normalizer, val_loader):
    normalizer.model.eval()
    exact_matches = 0
    total = 0

    with torch.no_grad():
        for batch in tqdm(val_loader, desc="Calculating metrics"):
            input_ids = batch["input_ids"].to(normalizer.device)
            attention_mask = batch["attention_mask"].to(normalizer.device)
            labels = batch["labels"]

            # Generate predictions
            outputs = normalizer.model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                max_length=128,
                num_beams=4,
                early_stopping=True,
                temperature=1.0,        
                top_k=20,            
                top_p=8.0,
                repetition_penalty=1.0  
            )

            # Decode predictions and labels
            predictions = [normalizer.tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
            true_labels = [normalizer.tokenizer.decode(label, skip_special_tokens=True) for label in labels]

            # Calculate metrics
            exact_matches += sum(1 for pred, true in zip(predictions, true_labels) if pred == true)
            total += len(predictions)

    accuracy = exact_matches / total
    print(f"\nExact match accuracy: {accuracy:.4f}")
    return accuracy


accuracy = calculate_accuracy(normalizer, val_loader)

In [9]:
def calculate_exact_matches(predictions, true_labels):
    """
    Calculate the percentage of predictions that exactly match their true labels
    """
    matches = sum(1 for pred, true in zip(predictions, true_labels) if pred.strip() == true.strip())
    return matches / len(predictions)

In [10]:
from sklearn.metrics import f1_score
from collections import Counter

def calculate_token_f1(predictions, true_labels):
    """
    Calculate F1 score based on shared tokens between prediction and true label
    """
    def tokenize(text):
        # Split on common delimiters and create a set of tokens
        return set(token.strip() for token in text.replace('/', ' ').split())
    
    all_true_tokens = []
    all_pred_tokens = []
    
    for pred, true in zip(predictions, true_labels):
        true_tokens = tokenize(true)
        pred_tokens = tokenize(pred)
        
        # Convert to binary presence/absence for each token
        all_tokens = true_tokens.union(pred_tokens)
        all_true_tokens.extend(1 if token in true_tokens else 0 for token in all_tokens)
        all_pred_tokens.extend(1 if token in pred_tokens else 0 for token in all_tokens)
    
    return f1_score(all_true_tokens, all_pred_tokens)

In [12]:
def calculate_order_score(predictions, true_labels):
    """
    Calculate how well the model preserves the correct order of names
    """
    def get_ordered_names(text):
        return [name.strip() for name in text.split('/')]
    
    correct_order = 0
    total_pairs = 0
    
    for pred, true in zip(predictions, true_labels):
        pred_names = get_ordered_names(pred)
        true_names = get_ordered_names(true)
        
        # Check relative ordering of each pair of names
        for i in range(len(true_names)):
            for j in range(i + 1, len(true_names)):
                if i < len(pred_names) and j < len(pred_names):
                    if (pred_names[i] in true_names and 
                        pred_names[j] in true_names):
                        if (true_names.index(pred_names[i]) < 
                            true_names.index(pred_names[j])):
                            correct_order += 1
                    total_pairs += 1
    
    return correct_order / total_pairs if total_pairs > 0 else 0

In [13]:
def evaluate_model(normalizer, val_loader):
    """
    Comprehensive evaluation of the text normalization model
    """
    normalizer.model.eval()
    all_predictions = []
    all_true_labels = []
    
    print("Generating predictions...")
    with torch.no_grad():
        for batch in tqdm(val_loader):
            # Generate predictions
            outputs = normalizer.model.generate(
                input_ids=batch['input_ids'].to(normalizer.device),
                attention_mask=batch['attention_mask'].to(normalizer.device),
                max_length=32,
                num_beams=4,
                early_stopping=True,
                temperature=1.0,        
                top_k=20,            
                top_p=8.0,
                repetition_penalty=1.0  
            )
            
            # Decode predictions and true labels
            predictions = [normalizer.tokenizer.decode(output, skip_special_tokens=True) 
                         for output in outputs]
            true_labels = [normalizer.tokenizer.decode(label, skip_special_tokens=True) 
                         for label in batch['labels']]
            
            all_predictions.extend(predictions)
            all_true_labels.extend(true_labels)
    
    # Calculate all metrics
    exact_accuracy = calculate_exact_matches(all_predictions, all_true_labels)
    token_f1 = calculate_token_f1(all_predictions, all_true_labels)
    order_score = calculate_order_score(all_predictions, all_true_labels)
    
    print("\nEvaluation Results:")
    print(f"Exact Match Accuracy: {exact_accuracy:.4f}")
    print(f"Token F1 Score: {token_f1:.4f}")
    print(f"Order Preservation Score: {order_score:.4f}")
    

* best_model.pt   lr 1e-5 max_length 32,  batch 64,   accuracy  0.7679 0.9339 0.7889 
* best_model1.pt  lr 1e-6 max_length 32,  batch 64,   accuracy  0.6697 0.8814 0.8814 
* best_model2.pt  lr 3e-5 max_length 128, batch 32,   accuracy

In [16]:
#normalizer = load_trained_model(model_path="best_model2.pt")
evaluate_model(normalizer, val_loader)

Loading t5-small model...
Model and tokenizer loaded successfully
Model loaded successfully!
Generating predictions...


  0%|          | 0/55 [00:00<?, ?it/s]


Evaluation Results:
Exact Match Accuracy: 0.7939
Token F1 Score: 0.9249
Order Preservation Score: 0.7639


In [15]:
def load_trained_model(model_path="best_model.pt", device=device):

    # Initialize a new model instance
    normalizer = TextNormalizer(device=device)
    
    # Load the saved state dictionary
    state_dict = torch.load(model_path, map_location=device)
    normalizer.model.load_state_dict(state_dict)
    
    # Set model to evaluation mode
    normalizer.model.eval()
    print("Model loaded successfully!")
    
    return normalizer

In [None]:
# Load the trained model
normalizer = load_trained_model(model_path="best_model2.pt")

# Test single examples
test_texts = [
    "Mike Hoyer/JERRY CHESNUT/SONY/ATV MUSIC PUBLISHING",
    "<Unknown>/Wright, Justyce Kaseem",
    "Pixouu/Abdou Gambetta/Copyright Control"
]

print("Testing individual examples:")
for text in test_texts:
    normalized = normalizer.normalize_text(text)
    print(f"\nInput: {text}")
    print(f"Normalized: {normalized}")

Loading t5-small model...
Model and tokenizer loaded successfully
Model loaded successfully!
Testing individual examples:

Input: Mike Hoyer/JERRY CHESNUT/SONY/ATV MUSIC PUBLISHING
Normalized: Mike Hoyer/JERRY CHESNUT/SONY

Input: <Unknown>/Wright, Justyce Kaseem
Normalized: Wright/Justyce Kaseem

Input: Pixouu/Abdou Gambetta/Copyright Control
Normalized: Pixouu/Abdou Gambetta
