In [1]:
import numpy as np
import re
import pandas as pd
from bs4 import BeautifulSoup
import random

from google.colab import drive

!pip install transformers



In [2]:
from transformers import BertTokenizer, TFBertModel
import tensorflow as tf
from tensorflow.keras import layers

In [3]:
drive.mount("/content/drive")

Mounted at /content/drive


In [4]:
cols = ["sentiment", "id", "date", "query", "user", "text"]
data = pd.read_csv(
    "/content/drive/MyDrive/dataset/training.csv",
    header=None,
    names=cols,
    engine="python",
    encoding="latin1"
)

data.drop(["id", "date", "query", "user"], axis=1, inplace=True)

def clean_tweet(tweet):
    tweet = BeautifulSoup(tweet, "lxml").get_text()
    tweet = re.sub(r"@[A-Za-z0-9]+", ' ', tweet)
    tweet = re.sub(r"https?://[A-Za-z0-9./]+", ' ', tweet)
    tweet = re.sub(r"[^a-zA-Z.!?']", ' ', tweet)
    tweet = re.sub(r" +", ' ', tweet)
    return tweet

data_clean = [clean_tweet(tweet) for tweet in data.text]
data_labels = data.sentiment.values
data_labels[data_labels == 4] = 1

  tweet = BeautifulSoup(tweet, "lxml").get_text()


In [5]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

def encode_sentence(sent):
    return tokenizer.encode(sent, max_length=128, truncation=True, padding='max_length')

data_inputs = [encode_sentence(sentence) for sentence in data_clean]

def get_mask(tokens):
    return [1 if token != tokenizer.pad_token_id else 0 for token in tokens]

def get_segments(tokens):
    return [0] * len(tokens)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]



In [7]:
import math

In [8]:
data_with_len = [[sent, data_labels[i], len(sent)] for i, sent in enumerate(data_inputs)]
random.shuffle(data_with_len)
data_with_len.sort(key=lambda x: x[2])

sorted_all = [([sent_lab[0], get_mask(sent_lab[0]), get_segments(sent_lab[0])], sent_lab[1]) for sent_lab in data_with_len if sent_lab[2] > 7]

all_dataset = tf.data.Dataset.from_generator(lambda: sorted_all, output_types=(tf.int32, tf.int32))

BATCH_SIZE = 32
all_batched = all_dataset.padded_batch(BATCH_SIZE, padded_shapes=((3, None), ()))

NB_BATCHES = math.ceil(len(sorted_all) / BATCH_SIZE)
NB_BATCHES_TEST = NB_BATCHES // 10

all_batched = all_batched.shuffle(NB_BATCHES)
test_dataset = all_batched.take(NB_BATCHES_TEST)
train_dataset = all_batched.skip(NB_BATCHES_TEST)

In [9]:
class DCNNBERTEmbedding(tf.keras.Model):
    def __init__(self, nb_filters=50, FFN_units=512, nb_classes=2, dropout_rate=0.1, name="dcnn"):
        super(DCNNBERTEmbedding, self).__init__(name=name)
        self.bert_layer = TFBertModel.from_pretrained('bert-base-uncased')
        self.bigram = layers.Conv1D(filters=nb_filters, kernel_size=2, padding="valid", activation="relu")
        self.trigram = layers.Conv1D(filters=nb_filters, kernel_size=3, padding="valid", activation="relu")
        self.fourgram = layers.Conv1D(filters=nb_filters, kernel_size=4, padding="valid", activation="relu")
        self.pool = layers.GlobalMaxPool1D()
        self.dense_1 = layers.Dense(units=FFN_units, activation="relu")
        self.dropout = layers.Dropout(rate=dropout_rate)
        if nb_classes == 2:
            self.last_dense = layers.Dense(units=1, activation="sigmoid")
        else:
            self.last_dense = layers.Dense(units=nb_classes, activation="softmax")

    def call(self, inputs, training=False):
        input_ids, attention_mask, token_type_ids = inputs
        bert_output = self.bert_layer(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        x = bert_output.last_hidden_state
        x_1 = self.bigram(x)
        x_1 = self.pool(x_1)
        x_2 = self.trigram(x)
        x_2 = self.pool(x_2)
        x_3 = self.fourgram(x)
        x_3 = self.pool(x_3)
        merged = tf.concat([x_1, x_2, x_3], axis=-1)
        merged = self.dense_1(merged)
        merged = self.dropout(merged, training=training)
        output = self.last_dense(merged)
        return output

In [10]:
NB_FILTERS = 100
FFN_UNITS = 256
NB_CLASSES = 2
DROPOUT_RATE = 0.2

Dcnn = DCNNBERTEmbedding(nb_filters=NB_FILTERS, FFN_units=FFN_UNITS, nb_classes=NB_CLASSES, dropout_rate=DROPOUT_RATE)

if NB_CLASSES == 2:
    Dcnn.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
else:
    Dcnn.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["sparse_categorical_accuracy"])

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFBertModel: ['cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing TFBertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFBertModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions w

In [21]:
import tensorflow as tf
tf.config.run_functions_eagerly(True)

In [22]:
class DCNNBERTEmbedding(tf.keras.Model):
    @tf.function
    def call(self, inputs, training=False):
        input_ids, attention_mask, token_type_ids = inputs
        bert_output = self.bert_layer(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        x = bert_output.last_hidden_state
        # Continue with the rest of your model's logic
        return x

In [23]:
def call(self, inputs, training=False):
    input_ids = inputs[:, 0, :]  # or another appropriate slicing based on input shape
    attention_mask = inputs[:, 1, :]
    token_type_ids = inputs[:, 2, :]
    bert_output = self.bert_layer(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
    x = bert_output.last_hidden_state
    return x


In [1]:
checkpoint_path = "./drive/MyDrive/projects/BERT/ckpt_bert_embedding/"
ckpt = tf.train.Checkpoint(Dcnn=Dcnn)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=1)

if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print("Latest Checkpoint restored!")

class MyCustomCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        ckpt_manager.save()
        print("Checkpoint saved at {}.".format(checkpoint_path))

Dcnn.fit(train_dataset, epochs=5, callbacks=[MyCustomCallback()])

NameError: name 'tf' is not defined

In [13]:
results = Dcnn.evaluate(test_dataset)
print(results)

OperatorNotAllowedInGraphError: Exception encountered when calling DCNNBERTEmbedding.call().

[1mIterating over a symbolic `tf.Tensor` is not allowed. You can attempt the following resolutions to the problem: If you are running in Graph mode, use Eager execution mode or decorate this function with @tf.function. If you are using AutoGraph, you can try decorating this function with @tf.function. If that does not work, then you may be using an unsupported feature or your source code may not be visible to AutoGraph. See https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/autograph/g3doc/reference/limitations.md#access-to-source-code for more information.[0m

Arguments received by DCNNBERTEmbedding.call():
  • inputs=tf.Tensor(shape=(None, 3, None), dtype=int32)
  • training=False

In [None]:
def get_prediction(sentence):
    tokens = encode_sentence(sentence)
    input_ids = get_ids(tokens)
    input_mask = get_mask(tokens)
    segment_ids = get_segments(tokens)
    inputs = tf.stack([tf.cast(input_ids, dtype=tf.int32),
                       tf.cast(input_mask, dtype=tf.int32),
                       tf.cast(segment_ids, dtype=tf.int32)], axis=0)
    inputs = tf.expand_dims(inputs, 0)
    output = Dcnn(inputs, training=False)
    sentiment = math.floor(output.numpy()[0] * 2)
    if sentiment == 0:
        print(f"Output of the model: {output.numpy()}\nPredicted sentiment: negative.")
    elif sentiment == 1:
        print(f"Output of the model: {output.numpy()}\nPredicted sentiment: positive.")

get_prediction("This movie was pretty interesting.")