<a href="https://colab.research.google.com/github/HadarMiriamIsaacson/BS-SE-24-207/blob/main/Unit_testing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install datasets



In [2]:
import os
import numpy as np
import pandas as pd
import unittest
from datasets import Dataset
from sklearn.metrics import f1_score, accuracy_score
import nltk
from nltk.corpus import stopwords
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification, PreTrainedModel, Trainer, TrainingArguments, DataCollatorForTokenClassification
from sklearn.model_selection import train_test_split

In [3]:
!pip install scikit-learn



In [4]:
from sklearn.metrics import cohen_kappa_score

In [5]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
# Define the file path as a global constant
FILE_PATH = '/content/drive/MyDrive/פרויקט גמר ניסן והדר/new/data_holy.xlsx'

# Other constants for the model saving/loading
DRIVE_SAVE_DIRECTORY = '/content/drive/MyDrive/fine-tuned-keyword-extraction-230824-test2'
MODEL_CHECKPOINT = "distilroberta-base"

# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(MODEL_CHECKPOINT, add_prefix_space=True)
model = AutoModelForTokenClassification.from_pretrained(MODEL_CHECKPOINT, num_labels=2)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of RobertaForTokenClassification were not initialized from the model checkpoint at distilroberta-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Test: Data Loading

In [7]:
def test_data_loading():
    try:
        data = pd.read_excel(FILE_PATH)
        assert not data.empty, "Data should not be empty"
        # Update the column name to match your dataset
        assert 'job_description' in data.columns, "Expected 'job_description' in the data"
        print("test_data_loading: PASSED")
    except AssertionError as e:
        print(f"test_data_loading: FAILED ({e})")

# Run the test
test_data_loading()


test_data_loading: PASSED


In [8]:
data = pd.read_excel(FILE_PATH)

In [9]:
data.head()

Unnamed: 0,job_num,job_description,word,hadar,nisan,judge,final,chatgpt,chatgpt2,KeyBert
0,0,The chosen Sr. Software Developer will be part...,chosen,0,0,,0,0,,0
1,0,The chosen Sr. Software Developer will be part...,sr,0,0,,0,0,,0
2,0,The chosen Sr. Software Developer will be part...,part,0,0,,0,0,,0
3,0,The chosen Sr. Software Developer will be part...,larger,0,0,,0,0,,0
4,0,The chosen Sr. Software Developer will be part...,team,0,0,,0,0,,0


Test: Column Lengths and Nulls

In [10]:
def test_column_lengths_and_nulls():
    """
    Test to ensure that specific columns have the same length as the data and that there are no null/None values.
    """
    try:
        data = pd.read_excel(FILE_PATH)

        # Check the total length of the data
        data_length = len(data)

        # Define the columns to check
        columns_to_check = ['hadar', 'nisan', 'KeyBert', 'chatgpt', 'final']

        # Check if all specified columns are in the data
        for column in columns_to_check:
            assert column in data.columns, f"Expected '{column}' in the data"
            # Check if the column has the same length as the dataset
            assert len(data[column]) == data_length, f"Column '{column}' length does not match data length"
            # Check for null or None values specifically in the specified columns
            assert data[column].isnull().sum() == 0, f"Column '{column}' contains null/None values"

        print("test_column_lengths_and_nulls: PASSED")
    except AssertionError as e:
        print(f"test_column_lengths_and_nulls: FAILED ({e})")

# Run the test
test_column_lengths_and_nulls()


test_column_lengths_and_nulls: PASSED


Test Kappa Cohen Agreement >=95%

In [11]:
def test_cohen_kappa_agreement():
    """
    Test to calculate the Cohen's Kappa agreement between 'nisan' and 'hadar' columns.
    Checks if the agreement is below 95%.
    """
    try:
        # Load the data
        data = pd.read_excel(FILE_PATH)

        # Ensure the required columns are present
        assert 'nisan' in data.columns, "Expected 'nisan' in the data"
        assert 'hadar' in data.columns, "Expected 'hadar' in the data"

        # Calculate Cohen's Kappa agreement
        kappa_score = cohen_kappa_score(data['nisan'], data['hadar'])

        # Check if the kappa score is below 95%
        assert kappa_score >= 0.95, f"Cohen's Kappa agreement is below 95%: {kappa_score:.4f}"

        print(f"test_cohen_kappa_agreement: PASSED (Kappa Score: {kappa_score:.4f})")
    except AssertionError as e:
        print(f"test_cohen_kappa_agreement: FAILED ({e})")

# Run the test
test_cohen_kappa_agreement()

test_cohen_kappa_agreement: PASSED (Kappa Score: 0.9521)


Test: Tokenization

In [12]:
def test_tokenization():
    try:
        sample_text = "Hello, world!"
        tokens = tokenizer(sample_text)
        assert 'input_ids' in tokens, "Tokenization output must include input_ids"
        # Handle special tokens that might be added by the tokenizer
        decoded_text = tokenizer.decode(tokens['input_ids'], skip_special_tokens=True,padding=True, truncation=True)
        assert decoded_text.strip() == sample_text, "Decoded tokens should match original text"
        print("test_tokenization: PASSED")
    except AssertionError as e:
        print(f"test_tokenization: FAILED ({e})")

# Run the test
test_tokenization()


test_tokenization: PASSED


Test: Training Pipeline including compute metrics test:

In [13]:
# Define constants
MODEL_CHECKPOINT = "distilroberta-base"
NUM_LABELS = 2

In [14]:
# Test 1: Downloading stopwords
def test_stopwords_download():
    try:
        nltk.download('stopwords')
        stop_words = set(stopwords.words('english'))
        assert len(stop_words) > 0, "Stopwords should be downloaded and contain words."
        print("test_stopwords_download: PASSED")
    except Exception as e:
        print(f"test_stopwords_download: FAILED ({e})")

# Test 2: Data Preparation
def prepare_data(data):
    examples = {"tokens": [], "labels": []}
    grouped = data.groupby('job_description')
    for job_desc, group in grouped:
        words = group['word'].tolist()
        labels = group['final'].tolist()
        examples["tokens"].append(words)
        examples["labels"].append(labels)
    return examples

In [15]:
def test_prepare_data(data):
    try:
        examples = prepare_data(data)
        assert "tokens" in examples and "labels" in examples, "Expected 'tokens' and 'labels' in examples."
        assert len(examples["tokens"]) == len(examples["labels"]), "Tokens and labels should have the same length."
        print("test_prepare_data: PASSED")
    except Exception as e:
        print(f"test_prepare_data: FAILED ({e})")

In [16]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True, padding=True, max_length=128)
    labels = []
    for i, label in enumerate(examples["labels"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)  # Ignore special tokens like padding
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])  # Use the label of the first word piece
            else:
                label_ids.append(-100)  # Ignore subsequent pieces of split words
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [17]:
def test_tokenization_and_alignment(examples):
    try:
        tokenized_inputs = tokenize_and_align_labels(examples)
        assert "input_ids" in tokenized_inputs and "labels" in tokenized_inputs, "Expected 'input_ids' and 'labels' in tokenized_inputs."
        assert len(tokenized_inputs["input_ids"]) == len(tokenized_inputs["labels"]), "Input_ids and labels should have the same length."
        print("test_tokenization_and_alignment: PASSED")
    except Exception as e:
        print(f"test_tokenization_and_alignment: FAILED ({e})")

In [18]:
def test_dataset_conversion(train_data, val_data):
    try:
        train_dataset = Dataset.from_dict(tokenize_and_align_labels(train_data))
        val_dataset = Dataset.from_dict(tokenize_and_align_labels(val_data))
        assert len(train_dataset) > 0 and len(val_dataset) > 0, "Training and validation datasets should not be empty."
        print("test_dataset_conversion: PASSED")
    except Exception as e:
        print(f"test_dataset_conversion: FAILED ({e})")

In [19]:
def test_model_initialization(model_checkpoint, num_labels):
    try:
        model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=num_labels)
        print(f"Loaded model type: {type(model)}")  # Debugging: Print the type of the model

        # Check if the model is a subclass of PreTrainedModel
        assert issubclass(type(model), PreTrainedModel), f"Model should be a subclass of PreTrainedModel, but got {type(model)}."

        # Ensure the model has the correct number of labels
        assert model.config.num_labels == num_labels, f"Model should have {num_labels} labels, but got {model.config.num_labels}."

        print("test_model_initialization: PASSED")
    except Exception as e:
        print(f"test_model_initialization: FAILED ({e})")

In [20]:
def test_training_pipeline():
    try:
        # Load the dataset
        data = pd.read_excel(FILE_PATH)
        examples = prepare_data(data)

        # Split into training and validation datasets
        train_examples, val_examples = train_test_split(
            list(zip(examples["tokens"], examples["labels"])), test_size=0.2, random_state=42
        )
        train_data = {"tokens": [x[0] for x in train_examples], "labels": [x[1] for x in train_examples]}
        val_data = {"tokens": [x[0] for x in val_examples], "labels": [x[1] for x in val_examples]}

        # Convert to Hugging Face Dataset format
        train_dataset = Dataset.from_dict(tokenize_and_align_labels(train_data))
        val_dataset = Dataset.from_dict(tokenize_and_align_labels(val_data))

        # Initialize model and tokenizer
        model = AutoModelForTokenClassification.from_pretrained(MODEL_CHECKPOINT, num_labels=NUM_LABELS)
        tokenizer = AutoTokenizer.from_pretrained(MODEL_CHECKPOINT, add_prefix_space=True)

        # Training arguments
        training_args = TrainingArguments(
            output_dir="./results",
            evaluation_strategy="epoch",
            learning_rate=2e-5,
            per_device_train_batch_size=2,
            per_device_eval_batch_size=2,
            num_train_epochs=1,
            weight_decay=0.01,
            logging_dir="./logs",
            logging_steps=10,
            save_strategy="epoch",
            load_best_model_at_end=True,
        )

        # Data collator
        data_collator = DataCollatorForTokenClassification(tokenizer)

        # Trainer
        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            tokenizer=tokenizer,
            data_collator=data_collator,
            compute_metrics=lambda p: {'accuracy': 1.0}  # Mock metric for testing
        )

        trainer.train()
        eval_results = trainer.evaluate()
        assert 'eval_accuracy' in eval_results, "Evaluation results should contain 'eval_accuracy'"
        print("test_training_pipeline: PASSED")
    except Exception as e:
        print(f"test_training_pipeline: FAILED ({e})")

In [21]:
test_stopwords_download()

test_stopwords_download: PASSED


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [22]:
data = pd.read_excel(FILE_PATH)
test_prepare_data(data)

test_prepare_data: PASSED


In [23]:
examples = prepare_data(data)

In [24]:
test_tokenization_and_alignment(examples)

test_tokenization_and_alignment: PASSED


In [25]:
train_examples, val_examples = train_test_split(
    list(zip(examples["tokens"], examples["labels"])), test_size=0.2, random_state=42
)
train_data = {"tokens": [x[0] for x in train_examples], "labels": [x[1] for x in train_examples]}
val_data = {"tokens": [x[0] for x in val_examples], "labels": [x[1] for x in val_examples]}

In [26]:
test_dataset_conversion(train_data, val_data)

test_dataset_conversion: PASSED


In [27]:
test_model_initialization(MODEL_CHECKPOINT, NUM_LABELS)

Some weights of RobertaForTokenClassification were not initialized from the model checkpoint at distilroberta-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Loaded model type: <class 'transformers.models.roberta.modeling_roberta.RobertaForTokenClassification'>
test_model_initialization: PASSED


In [28]:
test_training_pipeline()

Some weights of RobertaForTokenClassification were not initialized from the model checkpoint at distilroberta-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss,Accuracy
1,0.0284,0.143993,1.0


test_training_pipeline: PASSED


Test: Model Loading from Google Drive

In [30]:
def test_model_loading_from_gdrive():
    try:
        loaded_model = AutoModelForTokenClassification.from_pretrained(DRIVE_SAVE_DIRECTORY)
        loaded_tokenizer = AutoTokenizer.from_pretrained(DRIVE_SAVE_DIRECTORY)
        assert loaded_model is not None, "Model should be loaded successfully"
        assert loaded_tokenizer is not None, "Tokenizer should be loaded successfully"
        print("test_model_loading_from_gdrive: PASSED")
    except AssertionError as e:
        print(f"test_model_loading_from_gdrive: FAILED ({e})")

# Run the test
test_model_loading_from_gdrive()


test_model_loading_from_gdrive: PASSED
