In [1]:
%pip install transformers
%pip install tqdm
%pip install torch==2.2.2 torchvision==0.17.2 torchaudio==2.2.2 --index-url https://download.pytorch.org/whl/cu121

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


Looking in indexes: https://download.pytorch.org/whl/cu121
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from transformers import BertTokenizerFast, BertForTokenClassification, Trainer, TrainingArguments
import json
from tqdm import tqdm

with open('datasets\\ner\\spotify_dataset.json') as f:
    ds = json.load(f)

def get_labels_to_index():
    unique_labels = []
    for item in ds:
        labels = item['labels']
        for label in labels:
            if label not in unique_labels:
                unique_labels.append(label)
    return {label: idx for idx, label in enumerate(unique_labels)}
labels_to_index = get_labels_to_index()
texts = []
labels = []
encoded_labels = []
# create a list of labels represented as numbers
# because the computer doesnt care about the text
assert len(texts) == len(labels), "Mismatch between number of sentences and labels"
for item in ds:
    texts.append(item["sentence"])
    labels.append(item["labels"])
    assert len(item["sentence"].split()) == len(item["labels"]), f"Mismatch in length for sentence: {item}"
    encoded_labels.append([labels_to_index[label] for label in item["labels"]])

In [6]:
# now we need to split the data into training and 
# validation sets for both the text and prompts

# set a ratio to split the data
split_ratio = 0.8
total_samples = len(ds)

# create a random set of indices from 0 to the 
# total amount of samples in out data
indices = torch.randperm(total_samples)

# using the indices split the indices into indices for
# training and validation
training_indices = indices[:int(total_samples * split_ratio)]
validation_indices = indices[int(total_samples * split_ratio):]

# use the indices to select pieces of data to create
# the individual text and label sets for training
# and validation
training_texts = [texts[idx].split() for idx in training_indices]
encoded_training_labels = [encoded_labels[idx] for idx in training_indices]
validation_texts = [texts[idx].split() for idx in validation_indices]
encoded_validation_labels = [encoded_labels[idx] for idx in validation_indices]
print(training_texts)
print(encoded_training_labels)

[['Unpause', 'the', 'song'], ['Pause', 'the', 'music', 'now'], ['Skip', 'this', 'current', 'track'], ['Pause', 'the', 'song', 'please'], ['Play', 'the', 'music', 'now'], ['Unpause', 'the', 'current', 'song'], ['Pause', 'the', 'current', 'track'], ['Skip', 'track'], ['Play', 'the', 'next', 'song'], ['Unpause', 'the', 'track'], ['Skip', 'the', 'previous', 'song'], ['Can', 'you', 'play', 'the', 'next', 'track'], ['Skip', 'the', 'music', 'now'], ['Pause', 'this', 'song', 'now'], ['Skip', 'a', 'song'], ['Skip', 'this', 'track'], ['Play', 'this', 'music'], ['Skip', 'to', 'the', 'next', 'track'], ['Pause', 'this', 'track'], ['Please', 'play', 'that'], ['Pause', 'this', 'music'], ['Can', 'you', 'skip', 'this'], ['Can', 'you', 'play', 'another', 'track'], ['Skip', 'it'], ['Play', 'this', 'song', 'now'], ['Can', 'you', 'play', 'a', 'song'], ['Play', 'a', 'different', 'track'], ['Pause', 'a', 'different', 'song'], ['Play', 'the', 'current', 'track'], ['Pause', 'the', 'previous', 'track'], ['Skip'

In [7]:
# load the tokenizer
tokenizer = BertTokenizerFast.from_pretrained("bert-base-uncased")

# function to encode the text inputs
def encode_texts(texts):
    # padding going to add extra bits to the token 
    # ensuring consistant length
    # truncation will remove bits from tokens 
    # that are too long 
    # return_tensors="pt" will return a PyTorch tensor 
    # which we like because tensors are efficient
    # to work with
    return tokenizer(texts, truncation=True, padding='max_length', is_split_into_words=True, return_tensors="pt")

# encode the training and validation texts
encoded_training_texts = encode_texts(training_texts)
encoded_validation_texts = encode_texts(validation_texts)



In [8]:
def align_labels(tokenized_inputs, labels):
    aligned_labels = []
    obj_count = 0
    print(f"Total batches: {len(tokenized_inputs)}")
    print(labels)
    for i, label in enumerate(labels):
        print(f"Tokenized Text {i+1}: {tokenizer.convert_ids_to_tokens(tokenized_inputs['input_ids'][i])}")
        print(f"Word IDs {i+1}: {tokenized_inputs.word_ids(batch_index=i)}")
        print(f'Grabbing Label in position: {i}')
        label = labels[i]
        print(f'Label: {label}')
        obj_count += 1
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                print(f'Word idx: {word_idx}')
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        aligned_labels.append(label_ids)
    tokenized_inputs["labels"] = torch.tensor(aligned_labels)
    return tokenized_inputs

# Check the lengths of sentences and labels

for idx, (sentence, label) in enumerate(zip(texts, labels)):
    assert len(sentence.split()) == len(label), f"Mismatch in length for sentence {idx}"
        
label_aligned_training_texts = align_labels(encoded_training_texts, encoded_training_labels)
label_aligned_validation_texts = align_labels(encoded_validation_texts, encoded_validation_labels)

Total batches: 3
[[1, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0], [1, 0], [1, 0, 0, 0], [1, 0, 0], [1, 0, 0, 0], [0, 0, 1, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0], [1, 0, 0], [1, 0, 0], [1, 0, 0, 0, 0], [1, 0, 0], [0, 1, 0], [1, 0, 0], [0, 0, 1, 0], [0, 0, 1, 0, 0], [1, 0], [1, 0, 0, 0], [0, 0, 1, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [1, 0, 0, 0], [1, 0, 0, 0], [1, 0], [1, 0, 0, 0, 0], [1, 0, 0], [1, 0, 0], [1, 0, 0], [1, 0, 0], [1, 0, 0], [1, 0, 0], [1, 0, 0, 0], [1, 0, 0]]
Tokenized Text 1: ['[CLS]', 'un', '##pa', '##use', 'the', 'song', '[SEP]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]'

In [9]:
# we neeed to create Tensor datasets for 
# our training and validation sets
training_dataset = TensorDataset(label_aligned_training_texts["input_ids"], label_aligned_training_texts["attention_mask"], torch.tensor(label_aligned_training_texts["labels"]))
validation_dataset = TensorDataset(label_aligned_validation_texts["input_ids"], label_aligned_validation_texts["attention_mask"], torch.tensor(label_aligned_validation_texts["labels"]))

sampling_size = 10
training_loader = DataLoader(training_dataset, batch_size=sampling_size, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=sampling_size)

  training_dataset = TensorDataset(label_aligned_training_texts["input_ids"], label_aligned_training_texts["attention_mask"], torch.tensor(label_aligned_training_texts["labels"]))
  validation_dataset = TensorDataset(label_aligned_validation_texts["input_ids"], label_aligned_validation_texts["attention_mask"], torch.tensor(label_aligned_validation_texts["labels"]))


In [10]:
def calculate_accuracy(logits, labels, ignore_index=-100):
    predictions = torch.argmax(logits, dim=2)
    mask = labels != ignore_index
    correct_predictions = (predictions == labels) & mask
    correct_predictions = correct_predictions.float()
    total_correct = correct_predictions.sum()
    total = mask.sum()
    accuracy = (total_correct / total) * 100
    return accuracy.item()

def validation(avg_training_loss, avg_training_accuracy):
    model.eval()
    total_validation_loss = 0
    total_validation_accuracy = 0
    with torch.no_grad():
        for batch in validation_loader:
            
            input_ids, attention_mask, labels = [b.to(device) for b in batch]
            
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            
            total_validation_loss += loss.item()
            
            total_validation_accuracy += calculate_accuracy(outputs.logits, labels)
            
        avg_validation_loss = total_validation_loss / len(validation_loader)
        avg_validation_accuracy = total_validation_accuracy / len(validation_loader)
        
        print(f'Training Loss: {avg_training_loss:.4f}, Training Accuracy: {avg_training_accuracy:.2f}%')
        print(f'Validation Loss: {avg_validation_loss:.4f}, Validation Accuracy: {avg_validation_accuracy:.2f}%')

        

num_unique_labels = len(labels_to_index)
model = BertForTokenClassification.from_pretrained('bert-base-uncased', num_labels=num_unique_labels)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
print(f'Device: {device}')
# Define optimizer and training arguments
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)

# Training loop
num_epochs = 12

model.train()
for epoch in range(num_epochs):
    total_loss = 0
    total_accuracy = 0
    for batch in tqdm(training_loader):
        # we need to move our training batch to the same
        # hardware as the model
        input_ids, attention_mask, labels = [b.to(device) for b in batch]
        
        # We need to clear gradients from previous epochs
        optimizer.zero_grad()
        
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        total_accuracy += calculate_accuracy(outputs.logits, labels)

    print(f"Epoch {epoch+1}, Loss: {total_loss / len(training_loader)}")
    avg_training_loss = total_loss / len(training_loader)
    avg_training_accuracy = total_accuracy / len(training_loader)
    validation(avg_training_loss, avg_training_accuracy)

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Device: cuda


100%|██████████| 5/5 [00:12<00:00,  2.42s/it]


Epoch 1, Loss: 0.310942542552948
Training Loss: 0.3109, Training Accuracy: 86.78%
Validation Loss: 0.0312, Validation Accuracy: 100.00%


100%|██████████| 5/5 [00:07<00:00,  1.47s/it]


Epoch 2, Loss: 0.004557976452633739
Training Loss: 0.0046, Training Accuracy: 100.00%
Validation Loss: 0.0011, Validation Accuracy: 100.00%


100%|██████████| 5/5 [00:06<00:00,  1.23s/it]


Epoch 3, Loss: 0.0003757031896384433
Training Loss: 0.0004, Training Accuracy: 100.00%
Validation Loss: 0.0002, Validation Accuracy: 100.00%


100%|██████████| 5/5 [00:05<00:00,  1.18s/it]


Epoch 4, Loss: 0.00012253146996954456
Training Loss: 0.0001, Training Accuracy: 100.00%
Validation Loss: 0.0001, Validation Accuracy: 100.00%


100%|██████████| 5/5 [00:05<00:00,  1.19s/it]


Epoch 5, Loss: 6.82023397530429e-05
Training Loss: 0.0001, Training Accuracy: 100.00%
Validation Loss: 0.0001, Validation Accuracy: 100.00%


100%|██████████| 5/5 [00:06<00:00,  1.23s/it]


Epoch 6, Loss: 4.781231618835591e-05
Training Loss: 0.0000, Training Accuracy: 100.00%
Validation Loss: 0.0000, Validation Accuracy: 100.00%


100%|██████████| 5/5 [00:05<00:00,  1.19s/it]


Epoch 7, Loss: 3.824844752671197e-05
Training Loss: 0.0000, Training Accuracy: 100.00%
Validation Loss: 0.0000, Validation Accuracy: 100.00%


100%|██████████| 5/5 [00:06<00:00,  1.21s/it]


Epoch 8, Loss: 3.310008250991814e-05
Training Loss: 0.0000, Training Accuracy: 100.00%
Validation Loss: 0.0000, Validation Accuracy: 100.00%


100%|██████████| 5/5 [00:06<00:00,  1.26s/it]


Epoch 9, Loss: 2.9506887949537484e-05
Training Loss: 0.0000, Training Accuracy: 100.00%
Validation Loss: 0.0000, Validation Accuracy: 100.00%


100%|██████████| 5/5 [00:06<00:00,  1.23s/it]


Epoch 10, Loss: 2.7184216742170973e-05
Training Loss: 0.0000, Training Accuracy: 100.00%
Validation Loss: 0.0000, Validation Accuracy: 100.00%


100%|██████████| 5/5 [00:06<00:00,  1.26s/it]


Epoch 11, Loss: 2.5454485148657112e-05
Training Loss: 0.0000, Training Accuracy: 100.00%
Validation Loss: 0.0000, Validation Accuracy: 100.00%


100%|██████████| 5/5 [00:05<00:00,  1.19s/it]


Epoch 12, Loss: 2.4176058650482447e-05
Training Loss: 0.0000, Training Accuracy: 100.00%
Validation Loss: 0.0000, Validation Accuracy: 100.00%


In [1]:
# import this torch functional library and use softmax to convert
# logits to probabilites to get a confidence in a precentage
import torch.nn.functional as F

# Function to take text input and pass to model
def predict(text):
    # Encode the input text
    encoded_input = tokenizer(text, padding='max_length', truncation=True, return_tensors="pt")

    # Move the tensors to the same device as the model
    input_ids = encoded_input["input_ids"].to(device)
    attention_mask = encoded_input["attention_mask"].to(device)

    # Model inference
    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)

    # Get the logits and apply softmax to get probabilities
    logits = outputs.logits

    predicted_token_class_ids = logits.argmax(-1)

    # Decode the input ids back to tokens for display
    tokens = tokenizer.convert_ids_to_tokens(input_ids.squeeze().tolist())
    print(tokens)
    # Collect predictions with their corresponding probabilities
    predictions = []
    predicted_tokens_classes = [model.config.id2label[t.item()] for t in predicted_token_class_ids[0]]
    # Iterate over the tokens and their predicted token classes
    for token, token_class in zip(tokens, predicted_tokens_classes):
        label = [label for label, idx in labels_to_index.items() if idx == int(token_class[-1])]
        if label and label[0] != 'O':  # Assuming 'O' is the 'outside' label, typically ignored in display
            predictions.append((token, label[0]))

    # Calculate average confidence if predictions are made
    if predictions:
        formatted_predictions = ', '.join([f"{token}: {label}" for token, label in predictions])
        confidence = calculate_accuracy(logits, labels)
        return formatted_predictions, confidence
    else:
        return "No entities found.", 0.0

# Example of using the prediction function
prompt = input("Prompt: ")
predicted_label, confidence = predict(prompt)
print(f"Predicted Label: {predicted_label}\nConfidence: {confidence:.2f}%")


NameError: name 'tokenizer' is not defined

In [11]:
torch.save(model, 'spotify_command_model.pth')