# PII Masking with DeBERTa-v3

* In this project, I finetuned a DeBERTa-v3 named entity recognition model to detect, classify, and mask personal identifiable information(PII) in user input.

* The model achieved a high accuracy of ~99% in detecting the following categories:
  * Name
  * Email
  * Address
  * Phone Number
  * ID Number
  * Personal URL
  * Username

* Model capability expands for other categories (Money Amount, Age, Password, etc.) if provided with corresponding training data. Due to the scope of this project, I haven't done that.  

## Import Libraries

In [1]:
# data handling and manipulation
import json
import numpy as np
import pandas as pd

# machine learning with pytorch and transformers
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn import CrossEntropyLoss
from torch.cuda.amp import autocast, GradScaler
from transformers import AutoTokenizer, AutoModelForTokenClassification, AdamW, get_linear_schedule_with_warmup
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

# progress bar
from tqdm.notebook import tqdm

# random number generation
import random

# count list
from collections import Counter

# convert data format
import ast

## Data Preprocessing Setup

In [2]:
# personal identifiable information setup
pii_labels = ['B-EMAIL', 'B-ID_NUM', 'B-NAME_STUDENT', 'B-PHONE_NUM',
              'B-STREET_ADDRESS', 'B-URL_PERSONAL', 'B-USERNAME',
              'I-ID_NUM', 'I-NAME_STUDENT', 'I-PHONE_NUM',
              'I-STREET_ADDRESS', 'I-URL_PERSONAL', 'O']

# integer label to BIO format label mapping
pii_id2label = dict(enumerate(pii_labels))

# BIO format label to integer label mapping
pii_label2id = {v:k for k,v in pii_id2label.items()}

# number of PII (NER) tags
pii_num_labels = len(pii_labels)

In [4]:
# load the pre-trained deberta-v3 tokenizer
tokenizer = AutoTokenizer.from_pretrained('microsoft/deberta-v3-base')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/52.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/579 [00:00<?, ?B/s]

spm.model:   0%|          | 0.00/2.46M [00:00<?, ?B/s]



In [5]:
def tokenize_and_align_labels(tokens, ner_tags):
    # tokenize and pad the input tokens
    tokenized_inputs = tokenizer(tokens, truncation=True,
                                 is_split_into_words=True,
                                 padding='max_length',
                                 max_length=1025)

    # mapping of each token to its corresponding word in the original input
    word_ids = tokenized_inputs.word_ids()

    # initialize label_ids to store the aligned labels
    label_ids = []

    for word_idx in word_ids:
        if word_idx is None:
            label_ids.append(-100)
        else:
            label_ids.append(ner_tags[word_idx])

    # add the aligned NER labels to the tokenized_inputs dictionary
    tokenized_inputs['labels'] = label_ids

    return tokenized_inputs

In [6]:
def data_process(data, aug, test_size=0.1, random_state=42):

    # initialize empty arrays
    words = np.empty(len(data), dtype=object)
    labels = np.empty(len(data), dtype=object)

    # process and fill each data point
    for i, x in tqdm(enumerate(data), total=len(data)):
        words[i] = np.array(x['tokens'])
        labels[i] = np.array([pii_label2id[label] for label in x['labels']])

    # change data structure to list
    words_list = [arr.tolist() for arr in list(words)]
    labels_list = [arr.tolist() for arr in list(labels)]

    # store list in dictionary
    df_data = {
        'tokens': words_list,
        'ner_tags': labels_list
    }

    # make pandas dataframe
    df = pd.DataFrame(df_data)

    # add augmentation data
    df = pd.concat([df, aug], axis=0)

    # split the dataframe into train and test sets
    train_df, test_df = train_test_split(df, test_size=test_size, random_state=random_state)

    # apply tokenization and alignment to the train set with a progress bar
    train_tokenized_inputs = [
    tokenize_and_align_labels(row['tokens'], row['ner_tags'])
    for _, row in tqdm(train_df.iterrows(), total=len(train_df), desc="Processing Train Data")
    ]

    test_tokenized_inputs = [
    tokenize_and_align_labels(row['tokens'], row['ner_tags'])
    for _, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Processing Test Data")
    ]

    return train_df, test_df, train_tokenized_inputs, test_tokenized_inputs;

## Load and Process Data

In [7]:
# google drive path for training data
data_path = '/content/drive/MyDrive/personal_project/pii/pii_data'

# load the data
data = json.load(open(f'{data_path}/train.json'))

In [8]:
# add augmentation data
aug_train = pd.read_csv('/content/drive/MyDrive/personal_project/DEBERTA/AUG_NEW.csv')

# apply ast.literal_eval to convert the string representation of a list into an actual list
aug_train['tokens'] = aug_train['tokens'].apply(ast.literal_eval)
aug_train['ner_tags'] = aug_train['ner_tags'].apply(ast.literal_eval)

aug_train

Unnamed: 0,tokens,ner_tags
0,"[www.loganrodriguez.net, ., <ying.chen@website...","[5, 12, 0, 12, 3, 9, 9, 9, 12, 6, 12, 4, 10, 1..."
1,"[<lian.ma@webmail.com.cn>, ., Hua, Sun, ., www...","[0, 12, 2, 8, 12, 5, 12, 3, 9, 9, 12, 1, 12, 4..."
2,"[id987654, ., Olga, Ivanova, ., www.example.co...","[1, 12, 2, 8, 12, 5, 12, 6, 12, 4, 10, 12, 3, ..."
3,"[245, E, 24th, St, Apt, 12E,, New, York,, NY, ...","[4, 10, 10, 10, 10, 10, 10, 10, 10, 10, 12, 2,..."
4,"[tech_geek_101, ., Feng, Liu, ., <mia.white@si...","[6, 12, 2, 8, 12, 0, 12, 5, 12, 4, 10, 10, 10,..."
...,...,...
20995,"[We, just, opened, a, new, cafe, at, 4801, E, ...","[12, 12, 12, 12, 12, 12, 12, 4, 10, 10, 10, 10..."
20996,"[I, live, in, 45, Quincy, St, Cambridge, MA, 0...","[12, 12, 12, 4, 10, 10, 10, 10, 10, 12, 12, 12..."
20997,"[Our, new, shop, is, at, 1439, El, Prado, San,...","[12, 12, 12, 12, 12, 4, 10, 10, 10, 10, 10, 10..."
20998,"[Our, office, is, situated, at, 1, Six, Flags,...","[12, 12, 12, 12, 12, 4, 10, 10, 10, 10, 10, 10..."


In [9]:
# get the processed data
train_df, test_df, train_tokenized_inputs, test_tokenized_inputs = data_process(data, aug_train)

  0%|          | 0/6807 [00:00<?, ?it/s]

Processing Train Data:   0%|          | 0/25026 [00:00<?, ?it/s]

Processing Test Data:   0%|          | 0/2781 [00:00<?, ?it/s]

## Deep Learning Data Preprocessing

In [10]:
# check for GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'using device: {device}')

using device: cuda


In [11]:
# convert training inputs to PyTorch tensors
train_input_ids = torch.tensor([x['input_ids'] for x in train_tokenized_inputs]).to(device)
train_attention_mask = torch.tensor([x['attention_mask'] for x in train_tokenized_inputs]).to(device)
train_labels = torch.tensor([x['labels'] for x in train_tokenized_inputs]).to(device)

# convert testing inputs to PyTorch tensors
test_input_ids = torch.tensor([x['input_ids'] for x in test_tokenized_inputs]).to(device)
test_attention_mask = torch.tensor([x['attention_mask'] for x in test_tokenized_inputs]).to(device)
test_labels = torch.tensor([x['labels'] for x in test_tokenized_inputs]).to(device)

In [25]:
# customize a dataset class that inherits from PyTorch's 'Dataset' class
class NERDataset(Dataset):
    def __init__(self, input_ids, attention_mask, labels):
        self.input_ids = input_ids
        self.attention_mask = attention_mask
        self.labels = labels

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return {
            'input_ids': self.input_ids[idx],
            'attention_mask': self.attention_mask[idx],
            'labels': self.labels[idx]
        }

# create instances of NERDataset and DataLoaders for training and testing datasets
train_dataset = NERDataset(train_input_ids, train_attention_mask, train_labels)
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)  # Batch size set to 1

test_dataset = NERDataset(test_input_ids, test_attention_mask, test_labels)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)  # Batch size set to 1


## Deep Learning Model Setup

In [26]:
# load and define the deberta-v3-base model
model = AutoModelForTokenClassification.from_pretrained('microsoft/deberta-v3-small',
                                                        num_labels=pii_num_labels).to(device)

Some weights of DebertaV2ForTokenClassification were not initialized from the model checkpoint at microsoft/deberta-v3-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [27]:
# number of epochs
num_epochs = 5

# multiply the number of epochs and number of batches to calculate the total training steps
num_training_steps = num_epochs * len(train_dataloader)

# select 10% of total training steps as warming steps
num_warmup_steps = int(0.1 * num_training_steps)

# initialize the AdamW optimizer with weight decay
optimizer = AdamW(model.parameters(), lr=2e-5)

# initialize learning rate scheduler
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=num_training_steps)

In [28]:
torch.cuda.empty_cache()

## Training

In [29]:
# initialize the gradient scaler for mixed precision training
scaler = GradScaler()

# store losses for potential plotting
train_losses = []

# set the model to training mode
model.train()

# gradient accumulation steps
accumulation_steps = 4

# loop over training epochs
for epoch in range(num_epochs):

    # initialize loss for this epoch
    epoch_train_loss = 0

    # loop over batches in the training dataset
    for i, batch in enumerate(tqdm(train_dataloader, desc=f"Training Epoch {epoch+1}/{num_epochs}")):

        # clear previous gradients in case of accumulation
        optimizer.zero_grad()

        # ensure correct computation location (GPU) for input data
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        # enable mixed precision training
        with autocast():
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss / accumulation_steps  # Scale loss by accumulation steps

        # backwards propagation
        scaler.scale(loss).backward()

        # update model parameters if accumulation steps are reached
        if (i + 1) % accumulation_steps == 0:
            scaler.step(optimizer)
            scaler.update()
            scheduler.step()
            optimizer.zero_grad()

        # accumulate loss for this epoch
        epoch_train_loss += loss.item() * accumulation_steps  # Scale back the loss by accumulation steps

        # store los s for plotting
        train_losses.append(loss.item() * accumulation_steps)

    # average epoch loss
    avg_train_loss = epoch_train_loss / len(train_dataloader)

    # show the result
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_train_loss}")


Training Epoch 1/5:   0%|          | 0/1565 [00:00<?, ?it/s]

Epoch 1/5, Training Loss: 0.7970617227339611


Training Epoch 2/5:   0%|          | 0/1565 [00:00<?, ?it/s]

Epoch 2/5, Training Loss: 0.02509428178162549


Training Epoch 3/5:   0%|          | 0/1565 [00:00<?, ?it/s]

Epoch 3/5, Training Loss: 0.006568712465049991


Training Epoch 4/5:   0%|          | 0/1565 [00:00<?, ?it/s]

Epoch 4/5, Training Loss: 0.0041836310486746625


Training Epoch 5/5:   0%|          | 0/1565 [00:00<?, ?it/s]

Epoch 5/5, Training Loss: 0.0028347987242559264


## Save Trained Model

In [30]:
# specify the path in your Google Drive where you want to save the model
model_save_path = '/content/drive/MyDrive/personal_project/DEBERTA/820_1.pth'

# save the model's state dictionary
torch.save(model.state_dict(), model_save_path)

print(f"Model saved to {model_save_path}")

Model saved to /content/drive/MyDrive/personal_project/DEBERTA/820_1.pth


## Load Saved Model

In [31]:
# initialize the model architecture
model_test = AutoModelForTokenClassification.from_pretrained('microsoft/deberta-v3-base', num_labels=13).to(device)

# Load the model's state dictionary
model_save_path = '/content/drive/MyDrive/personal_project/DEBERTA/820_1.pth'

# For CPU processing
model_test.load_state_dict(torch.load(model_save_path))

Some weights of DebertaV2ForTokenClassification were not initialized from the model checkpoint at microsoft/deberta-v3-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


<All keys matched successfully>

## Model Output Processing Setup

In [32]:
def mask_pii(tokens, labels):
    masked_tokens = []
    for token, label in zip(tokens, labels):
        if label != 'O':
            masked_tokens.append(f'[masked: {label[2:]}]')
        else:
            masked_tokens.append(token)
    return ' '.join(masked_tokens)

In [33]:
def pii_masker(example_input_text):

    # tokenize the input text while keeping track of the original words
    tokenized_inputs = tokenizer(example_input_text, return_tensors="pt", truncation=True, padding='max_length', max_length=1024, is_split_into_words=False)
    example_inputs = {k: v.to(device) for k, v in tokenized_inputs.items()}

    # extract the input IDs
    input_ids = example_inputs['input_ids'].squeeze().cpu().numpy()

    # convert the input IDs back to tokens
    tokens = tokenizer.convert_ids_to_tokens(input_ids)

    # decode the tokenized input to see special tokens
    decoded_input = tokenizer.decode(input_ids, skip_special_tokens=True)
    print('Original Sentence:', decoded_input)


    # filter out special tokens
    special_tokens = [tokenizer.cls_token_id, tokenizer.sep_token_id, tokenizer.pad_token_id]
    filtered_tokens = [token for token, token_id in zip(tokens, input_ids) if token_id not in special_tokens]


    # set model to evaluation mode
    model.eval()

    print("Performing PII Detection...")
    # perform prediction
    with torch.no_grad():
        outputs = model(input_ids=example_inputs['input_ids'], attention_mask=example_inputs['attention_mask'])

    # extract logits
    logits = outputs.logits

    # get the predicted class for each token
    predictions = torch.argmax(logits, dim=2).squeeze().cpu().numpy()

    # filter out predictions for special tokens
    filtered_predictions = [pred for pred, token_id in zip(predictions, input_ids) if token_id not in special_tokens]

    # function to join subwords and assign the most frequent label
    def join_subwords_and_labels(tokens, labels):
        joined_tokens = []
        joined_labels = []
        current_token = ""
        current_labels = []

        for token, label in zip(tokens, labels):
            if token.startswith("▁") or token.startswith("##"):
                if current_token:
                    joined_tokens.append(current_token)
                    joined_labels.append(Counter(current_labels).most_common(1)[0][0])
                current_token = token.replace("▁", "").replace("##", "")
                current_labels = [label]
            else:
                current_token += token
                current_labels.append(label)

        if current_token:
            joined_tokens.append(current_token)
            joined_labels.append(Counter(current_labels).most_common(1)[0][0])

        return joined_tokens, joined_labels

    # join subwords and assign the most frequent labels
    joined_tokens, joined_labels = join_subwords_and_labels(filtered_tokens, filtered_predictions)

    # convert joined labels to their corresponding tag names
    joined_labels = [pii_id2label[label] if label != -100 else "PAD" for label in joined_labels]

    masked_sentence = mask_pii(joined_tokens, joined_labels)

    print("PII-masked Sentence:", masked_sentence)
    return masked_sentence

## Try Own Sentence

In [34]:
masked_sentence = pii_masker("Zephyr Moonstone, residing at 742 Evergreen Terrace Springfield USA, can be reached at +1 (555) 123-4567 or via email at stardust_seeker@galaxymail.com. Their website is www.cosmic-wonders.space, and they use the username cosmic_voyager online. Their employee ID is XR-739-2468-AZ.")

Original Sentence: Zephyr Moonstone, residing at 742 Evergreen Terrace Springfield USA, can be reached at +1 (555) 123-4567 or via email at stardust_seeker@galaxymail.com. Their website is www.cosmic-wonders.space, and they use the username cosmic_voyager online. Their employee ID is XR-739-2468-AZ.
Performing PII Detection...
PII-masked Sentence: [masked: NAME_STUDENT] [masked: NAME_STUDENT] residing at [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] can be reached at [masked: PHONE_NUM] [masked: PHONE_NUM] [masked: PHONE_NUM] or via email at [masked: EMAIL] Their website is [masked: URL_PERSONAL] and they use the username [masked: USERNAME] online. Their employee ID is [masked: ID_NUM]


In [38]:
masked_sentence = pii_masker("My name is Kelvin Zheng Chen. I had the incredible opportunity to intern at TechInnovate Corp, located at 1234 Silicon Valley Drive, Palo Alto, CA 94304. As a computer science student at Stanford University (Student ID: K0123456789), I was thrilled to apply my knowledge in a real-world setting.")

Original Sentence: My name is Kelvin Zheng Chen. I had the incredible opportunity to intern at TechInnovate Corp, located at 1234 Silicon Valley Drive, Palo Alto, CA 94304. As a computer science student at Stanford University (Student ID: K0123456789), I was thrilled to apply my knowledge in a real-world setting.
Performing PII Detection...
PII-masked Sentence: My name is [masked: NAME_STUDENT] [masked: NAME_STUDENT] [masked: NAME_STUDENT] I had the incredible opportunity to intern at [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] located at [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] As a computer science student at [masked: STREET_ADDRESS] [masked: STREET_ADDRESS] (Student [masked: ID_NUM] [masked: ID_NUM] I was thrilled to apply my knowledge in a real-world setting.


## Test Set Accuracy (~99%)

In [39]:
model_test.eval()

DebertaV2ForTokenClassification(
  (deberta): DebertaV2Model(
    (embeddings): DebertaV2Embeddings(
      (word_embeddings): Embedding(128100, 768, padding_idx=0)
      (LayerNorm): LayerNorm((768,), eps=1e-07, elementwise_affine=True)
      (dropout): StableDropout()
    )
    (encoder): DebertaV2Encoder(
      (layer): ModuleList(
        (0-11): 12 x DebertaV2Layer(
          (attention): DebertaV2Attention(
            (self): DisentangledSelfAttention(
              (query_proj): Linear(in_features=768, out_features=768, bias=True)
              (key_proj): Linear(in_features=768, out_features=768, bias=True)
              (value_proj): Linear(in_features=768, out_features=768, bias=True)
              (pos_dropout): StableDropout()
              (dropout): StableDropout()
            )
            (output): DebertaV2SelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-07, elementwise_affine=Tr

In [40]:
total_correct = 0
total_predictions = 0

In [42]:
# disable gradient calculation
with torch.no_grad():
    # wrap the dataloader with tqdm to create a progress bar
    for batch in tqdm(test_dataloader, desc="Evaluating"):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        # get model predictions
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits

        # get the predicted class for each token
        predictions = torch.argmax(logits, dim=2)

        # compare predictions with the true labels
        for i in range(labels.size(0)):  # iterate over each example in the batch
            mask = labels[i] != -100  # assuming -100 is the ignore index for padding
            correct = (predictions[i][mask] == labels[i][mask]).sum().item()
            total_correct += correct
            total_predictions += mask.sum().item()

Evaluating:   0%|          | 0/174 [00:00<?, ?it/s]

In [43]:
accuracy = total_correct / total_predictions
print(f"Accuracy on the test dataset: {accuracy:.4f}")

Accuracy on the test dataset: 0.9995
