## Setup

In [5]:
import json
import time
from accelerate import Accelerator
from dataclasses import dataclass, field
from functools import partial
import numpy as np
from tqdm import tqdm
import torch
from torch.utils.data import DataLoader
import datasets 
from transformers import AutoTokenizer, AutoModelForTokenClassification, HfArgumentParser, DataCollatorForTokenClassification

In [6]:
KEEP_LABELS = [
    "NAME",
    "NAME_LICENSE",
    "NAME_EXAMPLE",
    "EMAIL",
    "EMAIL_LICENSE",
    "EMAIL_EXAMPLE",
    "USERNAME",
    "USERNAME_LICENSE",
    "USERNAME_EXAMPLE",
    "KEY",
    "IP_ADDRESS",
    "PASSWORD",
]

# Special tokens
MASK_TOKEN = "<mask>"
SEPARATOR_TOKEN = "<sep>"
PAD_TOKEN = "<pad>"
CLS_TOKEN = "<cls>"

@dataclass
class NerArguments:

    """configuration for running NER model inference
    """
    model_name: str = field(
        default="bigcode/deberta-v3-large-pii-ner-v2",
        metadata={
            "help": "Name of model to use for inference"
        }
    )
    num_workers: int = field(
        default=16,
        metadata={
            "help": "Number of processes to use for inference"
        }
    )
    batch_size: int = field(
        default=64,
        metadata={
            "help": "the batch size to use for inference"
        }
    )
    dataset_name: str = field(
        default="bigcode/pii-annotated-toloka",
        metadata={
            "help": "Name of dataset to use for inference"
        }
    )
    dryrun: bool = field(
        default=False,
        metadata={
            "help": "Run a dryrun with a small subset of the data"
        }
    )
    output_path: str = field(
        default="output.json",
        metadata={
            "help": "Path to save output entities"
        }
    )

# Adapted from: transformers.pipelines.token_classification
def group_sub_entities(entities, tokenizer):
    first_entity, last_entity = entities[0], entities[-1]
    entity = first_entity["entity"].split("-")[-1]
    scores = np.nanmean([entity["score"] for entity in entities])
    tokens = [entity["word"] for entity in entities]

    return {
        "entity": entity,
        "score": np.mean(scores),
        "word": tokenizer.convert_tokens_to_string(tokens)
    }

In [7]:
args = NerArguments

## utilities

In [8]:
# Adapted from: transformers.pipelines.token_classification
def group_entities(entities, tokenizer):
    entity_groups = []
    entity_group_disagg = []

    if entities:
        last_idx = entities[-1]["index"]

    for entity in entities:
        is_last_idx = entity["index"] == last_idx
        if not entity_group_disagg:
            entity_group_disagg += [entity]
            if is_last_idx:
                entity_groups += [group_sub_entities(entity_group_disagg, tokenizer)]
            continue

        is_entity_start = entity["entity"].split("-")[0] == "B"
        curr_entity_type = entity["entity"].split("-")[-1]
        prev_entity_type = entity_group_disagg[-1]["entity"].split("-")[-1]
        is_adjacent_entity = entity["index"] == entity_group_disagg[-1]["index"] + 1

        is_same_entity_as_previous = (
            curr_entity_type == prev_entity_type and not is_entity_start
            ) and is_adjacent_entity
        if is_same_entity_as_previous:
            entity_group_disagg += [entity]
            if is_last_idx:
                entity_groups += [group_sub_entities(entity_group_disagg, tokenizer)]
        else:
            entity_groups += [group_sub_entities(entity_group_disagg, tokenizer)]
            entity_group_disagg = [entity]
            if is_last_idx:
                entity_groups += [group_sub_entities(entity_group_disagg, tokenizer)]

    return entity_groups


def prepare_tokenizer(tokenizer):
    tokenizer.add_special_tokens({"pad_token": PAD_TOKEN})
    tokenizer.add_special_tokens({"sep_token": SEPARATOR_TOKEN})
    tokenizer.add_special_tokens({"cls_token": CLS_TOKEN})
    tokenizer.add_special_tokens({"mask_token": MASK_TOKEN})
    tokenizer.model_max_length = 1024
    return tokenizer


def tokenize_function(entries, tokenizer):
    list_inputs = {
        k: [] for k in ["input_ids", "attention_mask", "special_tokens_mask"]
    }
    for text in entries["text"]:
        inputs = tokenizer(text, return_special_tokens_mask=True)
        for k in list_inputs.keys():
            list_inputs[k].append(inputs[k])
    return list_inputs

## Initializing dataset, model and accelerator

In [9]:
accelerator = Accelerator()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# load model and tokenizer
model = AutoModelForTokenClassification.from_pretrained(args.model_name).to(device)
tokenizer = AutoTokenizer.from_pretrained(args.model_name)
tokenizer = prepare_tokenizer(tokenizer)
# labels
IGNORE_LABELS_IDX = [i for l, i in model.config.label2id.items() if l not in KEEP_LABELS]
id2label = model.config.id2label

# load and tokenize dataset
dataset = datasets.load_dataset(args.dataset_name, split="train")
metadata_columns = [c for c in dataset.column_names if c != "text"]
if args.dryrun:
    dataset = dataset.select(range(1000))

Using custom data configuration bigcode--pii-annotated-toloka-aa0ea1d4040d00a1
Found cached dataset json (/fsx/loubna/.cache/bigcode___json/bigcode--pii-annotated-toloka-aa0ea1d4040d00a1/0.0.0/e6070c77f18f01a5ad4551a8b7edfba20b8438b7cad4d94e6ad9378022ce4aab)


In [10]:
id2label

{0: 'O',
 1: 'B-AMBIGUOUS',
 2: 'I-AMBIGUOUS',
 3: 'B-EMAIL',
 4: 'I-EMAIL',
 5: 'B-IP_ADDRESS',
 6: 'I-IP_ADDRESS',
 7: 'B-KEY',
 8: 'I-KEY',
 9: 'B-NAME',
 10: 'I-NAME',
 11: 'B-PASSWORD',
 12: 'I-PASSWORD',
 13: 'B-USERNAME',
 14: 'I-USERNAME'}

### Chunk dataset so we don't need to truncate long files and end up losing data

In [27]:
import itertools
from datasets import Dataset
from tqdm import tqdm

def _chunked_seq(seq, length):
    step = length

    for i in range(len(seq) // step + 1):
        if i * step < len(seq):
            yield seq[i * step : i * step + length]


def chunk_inputs(
    input_ids,
    attention_mask,
    special_tokens_mask,
    id,
    *,
    tokenizer,
    max_length,
    **kwargs
):
    chunks = zip(
        *[
            _chunked_seq(seq, max_length)
            for seq in (input_ids, attention_mask, special_tokens_mask)
        ]
    )
    return [
        dict(
            input_ids=input_ids,
            attention_mask=attention_mask,
            special_tokens_mask=special_tokens_mask,
            id=id,
            chunk_id=i,
        )
        for i, (input_ids, attention_mask, special_tokens_mask) in enumerate(chunks)
    ]


def chunk_dataset(dataset, tokenizer):
    return Dataset.from_list(
        list(
            itertools.chain(
                *(
                    chunk_inputs(
                        entry["input_ids"],
                        entry["attention_mask"],
                        entry["special_tokens_mask"],
                        entry["id"],
                        tokenizer=tokenizer,
                        max_length=tokenizer.model_max_length,
                    )
                    for entry in tqdm(list(dataset))
                )
            )
        )
    )


In [None]:
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)
with accelerator.main_process_first():
    tokenized_data = dataset.map(
        partial(tokenize_function, tokenizer=tokenizer),
        batched=True,
        num_proc=args.num_workers,
        remove_columns=metadata_columns,
    )
    tokenized_data = tokenized_data.add_column("id", range(len(tokenized_data)))
    tokenized_data = tokenized_data.remove_columns("text")
    chunked_data = chunk_dataset(tokenized_data, tokenizer)

dataloader = DataLoader(chunked_data, batch_size=args.batch_size, shuffle=False, collate_fn=data_collator)
print("length dataloader is", len(dataloader))
model, dataloader = accelerator.prepare(model, dataloader)

In [29]:
tokenized_data

Dataset({
    features: ['input_ids', 'attention_mask', 'special_tokens_mask', 'id'],
    num_rows: 12171
})

In [33]:
res = next(iter(dataloader))
print(res["input_ids"].shape)
print("-" * 60, "Example 0", "-" * 60)
tokens = tokenizer.convert_ids_to_tokens(res["input_ids"][0])
print(tokenizer.convert_tokens_to_string(tokens))
print(res["attention_mask"][0])
print(res["special_tokens_mask"][0])

print("-" * 60, "Example 1", "-" * 60)
tokens = tokenizer.convert_ids_to_tokens(res["input_ids"][1])
print(tokenizer.convert_tokens_to_string(tokens))
print(res["attention_mask"][1])
print(res["special_tokens_mask"][1])

torch.Size([64, 1024])
------------------------------------------------------------ Example 0 ------------------------------------------------------------
[CLS] { "first_name": "Yance", "last_name": "Bugbee", "email_address": "ybugbee7@narod.ru", "age": 22, }, { "first_name": "Zita", "last_name": "Walak", "email_address": "zwalak8@ebay.com", "age": 57, }, { "first_name": "Davie", "last_name": "Garmans", "email_address": "dgarmans9@biblegateway.com", "age": 53, }, ] return data def start_producer(service_uri: str, ca_path: str, cert_path: str, key_path: str): """Start the Kafka producer""" producer = KafkaProducer( bootstrap_servers=service_uri, security_protocol="SSL", ssl_cafile=ca_path, ssl_certfile=cert_path, ssl_keyfile=key_path, ) return producer def send_messages_to_consumer(producer, topic_name: str = "sample_customer_profile"): """Send messages from Kafka producer to consumer""" data = get_fake_data() for message in data: print(f"Sending message from producer: {message}") produ

In [34]:
all_entities = []
t_start = time.time()
for step, batch in tqdm(enumerate(dataloader)):
    t_1 = time.time()
    with torch.no_grad():
        outputs = model(
            input_ids=batch["input_ids"],
            attention_mask=batch["attention_mask"]
            )
        # warning: not very sure if this works with multiple GPU
        predictions, input_ids, special_tokens_mask = accelerator.gather((
            outputs.logits.squeeze(), batch["input_ids"], batch['special_tokens_mask']
        ))
        predictions = predictions.cpu().numpy()
        scores = np.exp(predictions) / np.exp(predictions).sum(-1, keepdims=True)
        batch_labels_idx = scores.argmax(axis=-1)
    forward_time = time.time() - t_1
    t_1 = time.time()
    batch_entities = []
    for text_id, labels_idx in enumerate(batch_labels_idx):
        entities = []
        filtered_labels_idx = [
            (id, label_id) 
            for id, label_id in enumerate(labels_idx) 
            if label_id not in IGNORE_LABELS_IDX and not special_tokens_mask[text_id][id]
        ]
        for id, label_id in filtered_labels_idx:
            entity = {
                "word": tokenizer.convert_ids_to_tokens(int(input_ids[text_id][id])),
                "index": id,
                "score": float(scores[text_id][id][label_id]),
                "entity": id2label[label_id],
            }
            entities += [entity]
        #print(f"post-processing time {time.time() - t_1}")
        batch_entities.append(group_entities(entities, tokenizer))
    all_entities += batch_entities
    if args.dryrun:
        print(f"Step {step}")
        print(f"forward time {forward_time}")
        print(f"post-processing time {time.time() - t_1}")
t_end = time.time()

print(f"total time: {t_end - t_start:.2f} seconds")
all_entities = all_entities[:len(dataset)]
if accelerator.is_main_process:
    print(all_entities[14])
    with open(args.output_path, "w") as f:
        json.dump(all_entities, f)

