In [34]:
from abc import ABC, abstractmethod

# BERT Model

In [35]:
import tensorflow as tf

from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.models import Model

from official.nlp.bert.tokenization import FullTokenizer

## BERT Tokenizers

In [36]:
class AbstractBertTokenizer(ABC):
    """ Abstract BERT Tokenizer"""
    label_pattern = None
    
    def __init__(self, encoder, bert_input_size):
        """ Create the BERT encoder and tokenizer """
        self.bert_input_size = bert_input_size
        self.tokenizer = FullTokenizer(
            encoder.resolved_object.vocab_file.asset_path.numpy(), 
            do_lower_case=encoder.resolved_object.do_lower_case.numpy()
        )

    @abstractmethod
    def tokenize_input(self, x):
        """ Tokenize input data """
        return

    def tokenize_labels(self, y):
        """ Tokenize input data labels """
        if self.label_pattern is not None:
            labels = [int(v) for v,n in zip(y, self.label_pattern) for i in range(n)]
            return tf.convert_to_tensor(labels, tf.int32)
        else:
            raise Exception("Must tokenize the input first")
    
    def _format_bert_tokens(self, ragged_word_ids):
        """ Create, format and pad BERT's input tensors """
        # Generate mask, and pad word_ids and mask
        mask = tf.ones_like(ragged_word_ids).to_tensor()
        word_ids = ragged_word_ids.to_tensor()
        padding = tf.constant([[0, 0], [0, (self.bert_input_size - mask.shape[1])]])
        word_ids = tf.pad(word_ids, padding, "CONSTANT")
        mask = tf.pad(mask, padding, "CONSTANT")
        type_ids = tf.zeros_like(mask)
        
        return {
            'input_word_ids': word_ids,
            'input_mask': mask,
            'input_type_ids': type_ids,
        }
    
    def _format_bert_word_piece_input(self, word_piece_tokens):
        word_piece_tokens.insert(0, '[CLS]')
        word_piece_tokens.append('[SEP]')
        return self.tokenizer.convert_tokens_to_ids(word_piece_tokens)

In [37]:
class BertIndividualTweetTokenizer(AbstractBertTokenizer):
    """ BERT tokenizer which tokenizes historical tweet data as individual tweets """
    
    def tokenize_input(self, X):
        """ Tokenize input data """
        tokenized_tweets = [
            self._tokenize_single_tweet(tweet) for tweet_feed in X for tweet in tweet_feed
        ]
        self.label_pattern = [len(tweet_feed) for tweet_feed in X]
        word_ids = tf.ragged.constant(tokenized_tweets)
        return self._format_bert_tokens(word_ids)

    def _tokenize_single_tweet(self, tweet):
        """ Tokenize a single tweet, truncating its tokens to bert_input_size """
        tokens = self.tokenizer.tokenize(tweet)[:self.bert_input_size-2]
        return self._format_bert_word_piece_input(tokens)

In [38]:
class BertTweetFeedTokenizer(AbstractBertTokenizer):
    """ BERT tokenizer which tokenizes historical tweet data as tweet feed chunks """
    
    def tokenize_input(self, X):
        """ Tokenize input data """
        tokenized_tweet_feeds = list(map(self._tokenize_tweet_feed, X))
        self.label_pattern = list(map(len, tokenized_tweet_feeds))
        word_ids = tf.ragged.constant(tokenized_tweet_feeds)
        return self._format_bert_tokens(word_ids)
    
    def _tokenize_tweet_feed(tweet_feed, overlap):
        """ Tokenize an entire tweet feed into chunks """
        feed_tokens = self.tokenizer.tokenize(tweet_feed)
        tokens = [feed_tokens[i:i+self.input_size-2] 
                  for i in range(0, len(feed_tokens), self.input_size-overlap)]

        return list(map(self._format_bert_word_piece_input, tokens))

## BERT Model

In [39]:
def create_bert_model(encoder, input_size):
    # Create BERT input layers
    def input_layer(input_name):
        return Input(shape=(input_size,), dtype=tf.int32, name=input_name)

    inputs = {
        'input_word_ids': input_layer("inputs/input_word_ids"),
        'input_mask': input_layer("inputs/input_mask"),
        'input_type_ids': input_layer("inputs/input_type_ids"),
    }

    # BERT's pooled output
    encoder_pooled_output = encoder(inputs)['pooled_output']

    # Dense layer output
    dense_output = Dense(1, activation='sigmoid')(encoder_pooled_output)

    # Create the Keras model and compile
    return Model(inputs, dense_output)