In [None]:
!git clone https://github.com/PacktPublishing/Python-Natural-Language-Processing-Cookbook-Second-Edition.git

Cloning into 'Python-Natural-Language-Processing-Cookbook-Second-Edition'...
remote: Enumerating objects: 433, done.[K
remote: Counting objects: 100% (24/24), done.[K
remote: Compressing objects: 100% (22/22), done.[K
remote: Total 433 (delta 11), reused 6 (delta 2), pack-reused 409 (from 1)[K
Receiving objects: 100% (433/433), 18.28 MiB | 11.29 MiB/s, done.
Resolving deltas: 100% (235/235), done.


In [None]:
import pandas as pd
import tensorflow as tf
import numpy as npx`
from transformers import BertTokenizer
from transformers import TFBertForSequenceClassification
from tensorflow.keras.layers import Dense
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from Chapter04.svm_classification import split_dataset
from Chapter05.twitter_sentiment import clean_data, plot_model

BATCH_SIZE = 32
DATASET_SIZE = 4000
english_twitter = "Chapter05/twitter_english.csv"
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=True)
max_length = 200



def map_inputs_to_dict(input_ids, attention_masks, token_type_ids, label):
  return {
      "input_ids": input_ids,
      "token_type_ids": token_type_ids,
      "attention_mask": attention_masks,
  }, label

def encode_example(input_text):
    tokenized = tokenizer.tokenize(input_text)
    bert_input = tokenizer.encode_plus(
                    input_text,
                    add_special_tokens = True, # add [CLS], [SEP]
                    max_length = max_length, # max length of the text that can go to BERT
                    pad_to_max_length = True, # add [PAD] tokens
                    return_attention_mask = True, # add attention mask to not focus on pad tokens
                    return_tensors='tf'
        )
    return bert_input

def encode_data(df):
    input_ids_list = []
    token_type_ids_list = []
    attention_mask_list = []
    label_list = []
    for index, row in df.iterrows():
        tweet = row['tweet']
        label = row['sentiment']
        bert_input = tokenizer.encode_plus(
                        tweet,
                        add_special_tokens = True, # add [CLS], [SEP]
                        max_length = max_length, # max length of the text that can go to BERT
                        pad_to_max_length = True, # add [PAD] tokens
                        return_attention_mask = True, # add attention mask to not focus on pad tokens
            )
        input_ids_list.append(bert_input['input_ids'])
        token_type_ids_list.append(bert_input['token_type_ids'])
        attention_mask_list.append(bert_input['attention_mask'])
        label_list.append([label])
    return tf.data.Dataset.from_tensor_slices((input_ids_list, attention_mask_list, token_type_ids_list, label_list)).map(map_inputs_to_dict)


def prepare_dataset(df, size=int(DATASET_SIZE/2)):
    df = clean_data(df)
    df = pd.concat([df.head(size),df.tail(size)])
    df = df.sample(frac = 1)
    ds = encode_data(df)
    return ds

def load_existing_model(export_dir):
    model = TFBertForSequenceClassification.from_pretrained(export_dir)
    return model


def get_test_train_val_datasets(ds, size=DATASET_SIZE, batch_size=BATCH_SIZE):
    ds.shuffle(32)
    train_size = int(0.7 * size)
    val_size = int(0.15 * size)
    test_size = int(0.15 * size)
    train_dataset = ds.take(train_size).batch(batch_size)
    test_dataset = ds.skip(train_size)
    val_dataset = test_dataset.skip(test_size).batch(batch_size)
    test_dataset = test_dataset.take(test_size).batch(batch_size)
    return (train_dataset, test_dataset, val_dataset)

def fine_tune_model(ds, export_dir):
    (train_dataset, test_dataset, val_dataset) = get_test_train_val_datasets(ds)
    learning_rate = 2e-5
    number_of_epochs = 1
    model = TFBertForSequenceClassification.from_pretrained('bert-base-uncased')
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate, epsilon=1e-08)
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    metric = tf.keras.metrics.SparseCategoricalAccuracy('accuracy')
    model.compile(optimizer=optimizer, loss=loss, metrics=[metric])
    bert_history = model.fit(train_dataset, epochs=number_of_epochs, validation_data=val_dataset)
    model.save_pretrained(export_dir)
    return model

def evaluate_model(model, X_test, y_test):
    y_pred = []
    for tweet in X_test:
        bert_input = encode_example(tweet)
        tf_output = model.predict([bert_input['input_ids'], bert_input['token_type_ids'], bert_input['attention_mask']])[0]
        tf_pred = tf.nn.softmax(tf_output, axis=1).numpy()[0]
        new_label = np.argmax(tf_pred, axis=-1)
        y_pred.append(new_label)
    print(classification_report(y_test, y_pred, labels=[0, 1], target_names=['negative', 'positive']))

def test_new_example(model_path, tweet):
    model = load_existing_model(model_path)
    bert_input = encode_example(tweet)
    tf_output = model.predict([bert_input['input_ids'], bert_input['token_type_ids'], bert_input['attention_mask']])[0]
    tf_pred = tf.nn.softmax(tf_output, axis=1).numpy()[0]
    new_label = np.argmax(tf_pred, axis=-1)
    print(new_label)
    return new_label

def load_and_evaluate_existing_model(export_dir, num_points=200):
    model = load_existing_model(export_dir)
    df = pd.read_csv(english_twitter, encoding="latin1")
    df = clean_data(df)
    df = pd.concat([df.head(num_points),df.tail(num_points)])
    (X_train, X_test, y_train, y_test) = split_dataset(df, 'tweet', 'sentiment')
    evaluate_model(model, X_test, y_test)

def main():
    df = pd.read_csv(english_twitter, encoding="latin1")
    dataset = prepare_dataset(df)
    model = fine_tune_model(dataset, 'Chapter05/bert_twitter_test2_model')


if(__name__ == "__main__"):
    main()
    #test_new_example('Chapter04/bert_twitter_test_model', "I hate going to school")
    #load_and_evaluate_existing_model('Chapter05/bert_twitter_test2_model')

In [None]:
from datasets import load_dataset, Dataset, Features, Value, ClassLabel, Sequence, DatasetDict
import pandas as pd
from transformers import AutoTokenizer, AutoModel
from transformers import DataCollatorForTokenClassification
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer
import numpy as np
from sklearn.model_selection import train_test_split
from evaluate import load
music_ner_df = pd.read_csv('../data/music_ner.csv')
def change_label(input_label):
    input_label = input_label.replace("_deduced", "")
    return input_label
music_ner_df["label"] = music_ner_df["label"].apply(change_label)
music_ner_df["text"] = music_ner_df["text"].apply(lambda x: x.replace("|", ","))
print(music_ner_df)

In [None]:
ids = list(set(music_ner_df["id"].values))
docs = {}
for id in ids:
    entity_rows = music_ner_df.loc[music_ner_df['id'] == id]
    text = entity_rows.head(1)["text"].values[0]
    doc = small_model(text)
    ents = []
    for index, row in entity_rows.iterrows():
        label = row["label"]
        start = row["start_offset"]
        end = row["end_offset"]
        span = doc.char_span(start, end, label=label, alignment_mode="contract")
        ents.append(span)
    doc.ents = ents
    docs[doc.text] = doc
data_file = "../data/music_ner_bio.bio"
tag_mapping = {"O": 0, "B-Artist": 1, "I-Artist": 2, "B-WoA": 3, "I-WoA": 4}
with open(data_file) as f:
    data = f.read()
tokens = []
ner_tags = []
spans = []
sentences = data.split("\n\n")
for sentence in sentences:
    words = []
    tags = []
    this_sentence_spans = []
    word_tag_pairs = sentence.split("\n")
    for pair in word_tag_pairs:
        (word, tag) = pair.split("\t")
        words.append(word)
        tags.append(tag_mapping[tag])
    sentence_text = " ".join(words)
    try:
        doc = docs[sentence_text]
    except:
        pass
    ent_dict = {}
    for ent in doc.ents:
        this_sentence_spans.append(f"{ent.label_}: {ent.text}")
    tokens.append(words)
    ner_tags.append(tags)
    spans.append(this_sentence_spans)
indices = range(0, len(spans))
train, test = train_test_split(indices, test_size=0.1)
train_tokens = []
test_tokens = []
train_ner_tags = []
test_ner_tags = []
train_spans = []
test_spans = []
for i, (token, ner_tag, span) in enumerate(zip(tokens, ner_tags, spans)):
    if i in train:
        train_tokens.append(token)
        train_ner_tags.append(ner_tag)
        train_spans.append(span)
    else:
        test_tokens.append(token)
        test_ner_tags.append(ner_tag)
        test_spans.append(span)

print(len(train_spans))
print(len(test_spans))
539
60
training_df = pd.DataFrame({"tokens":train_tokens, "ner_tags": train_ner_tags, "spans": train_spans})
test_df = pd.DataFrame({"tokens": test_tokens, "ner_tags": test_ner_tags, "spans": test_spans})
training_df["text"] = training_df["tokens"].apply(lambda x: " ".join(x))
test_df["text"] = test_df["tokens"].apply(lambda x: " ".join(x))
training_df.dropna()
test_df.dropna()
print(test_df)

In [None]:
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
#model = AutoModel.from_pretrained("bert-base-cased")
features = Features({'tokens': Sequence(feature=Value(dtype='string', id=None), length=-1, id=None),
            'ner_tags': Sequence(feature=ClassLabel(names=['O', 'B-Artist', 'I-Artist', 'B-WoA', 'I-WoA'], id=None), length=-1, id=None),
            'spans': Sequence(feature=Value(dtype='string', id=None), length=-1, id=None),
            'text': Value(dtype='string', id=None)
                    })
training_dataset = Dataset.from_pandas(training_df, features=features)
test_dataset = Dataset.from_pandas(test_df, features=features)
dataset = DatasetDict({"train":training_dataset, "test":test_dataset})
print(dataset["train"].features)
label_names = dataset["train"].features["ner_tags"].feature.names
print(dataset)

In [None]:
def tokenize_adjust_labels(all_samples_per_split):
    tokenized_samples = tokenizer.batch_encode_plus(all_samples_per_split["text"])
    total_adjusted_labels = []
    for k in range(0, len(tokenized_samples["input_ids"])):
        prev_wid = -1
        word_ids_list = tokenized_samples.word_ids(batch_index=k)
        existing_label_ids = all_samples_per_split["ner_tags"][k]
        i = -1
        adjusted_label_ids = []
        for wid in word_ids_list:
            if (wid is None):
                adjusted_label_ids.append(-100)
            elif (wid != prev_wid):
                i = i + 1
                adjusted_label_ids.append(existing_label_ids[i])
                prev_wid = wid
            else:
                label_name = label_names[existing_label_ids[i]]
                adjusted_label_ids.append(existing_label_ids[i])
        total_adjusted_labels.append(adjusted_label_ids)
    tokenized_samples["labels"] = total_adjusted_labels
    return tokenized_samples
tokenized_dataset = dataset.map(tokenize_adjust_labels, batched=True)

In [None]:
data_collator = DataCollatorForTokenClassification(tokenizer)
metric = load("seqeval")
def compute_metrics(data):
    predictions, labels = data
    predictions = np.argmax(predictions, axis=2)

    data = zip(predictions, labels)
    data = [[(p, l) for (p, l) in zip(prediction, label) if l != -100] for prediction, label in data]

    true_predictions = [[label_names[p] for (p, l) in data_point] for data_point in data]
    true_labels = [[label_names[l] for (p, l) in data_point] for data_point in data]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    flat_results = {
        "overall_precision": results["overall_precision"],
        "overall_recall": results["overall_recall"],
        "overall_f1": results["overall_f1"],
        "overall_accuracy": results["overall_accuracy"],
    }
    for k in results.keys():
      if (k not in flat_results.keys()):
        flat_results[k + "_f1"] = results[k]["f1"]

    return flat_results
# Train model
model = AutoModelForTokenClassification.from_pretrained('bert-base-uncased', num_labels=len(label_names))
training_args = TrainingArguments(
    output_dir="./fine_tune_bert_output",
    evaluation_strategy="steps",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=7,
    weight_decay=0.01,
    logging_steps = 1000,
    run_name = "ep_10_tokenized_11",
    save_strategy='no'
)
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)
trainer.train()

In [None]:
trainer.evaluate()


In [None]:
# Save model
trainer.save_model("../models/bert_fine_tuned")
# Use model
model = AutoModelForTokenClassification.from_pretrained("../models/bert_fine_tuned")
tokenizer = AutoTokenizer.from_pretrained("../models/bert_fine_tuned")
text = "music similar to morphine robocobra quartet | featuring elements like saxophone prominent bass"
from transformers import pipeline
pipe = pipeline(task="token-classification", model=model.to("cpu"), tokenizer=tokenizer, aggregation_strategy="simple")
pipe(text)
# tag_mapping = {"O": 0, "B-Artist": 1, "I-Artist": 2, "B-WoA": 3, "I-WoA": 4}