In [11]:
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

import tensorflow as tf
from tensorflow.keras import layers
import tensorflow_hub as thub
import bert
from tensorflow.keras import backend as K

import pandas as pd
import numpy as np

import re

import random

import os
from tqdm import tqdm

import model_utils

# maximum length of token sequences to input to bert model
max_seq_length = 128

# number of smaples to generate from the distributed database
n_samples = 5


In [12]:
# Verify NVIDIA drivers are install successfully

!sudo nvidia-smi

Tue Mar 31 21:45:26 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 418.87.00    Driver Version: 418.87.00    CUDA Version: 10.1     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  Tesla K80           Off  | 00000000:00:04.0 Off |                    0 |
| N/A   73C    P0    73W / 149W |     67MiB / 11441MiB |      0%   E. Process |
+-------------------------------+----------------------+----------------------+
|   1  Tesla K80           Off  | 00000000:00:05.0 Off |                    0 |
| N/A   45C    P0    64W / 149W |     67MiB / 11441MiB |      0%   E. Process |
+-------------------------------+----------------------+----------------------+
                                                                            

In [13]:
def tokenize_sample(context):
    
    """
    To be applied over Spark dataframe.
    Takes a string and converts it to token IDs via bert_tokenizer,
    adding the necessary beginning and end tokens

    Returns: Array of bert token ids for each row of Spark dataframe (requires udf)
    """
    
    tokenized = ["[CLS]"] + tokenizer.tokenize(context) + ["[SEP]"]
    ids = tokenizer.convert_tokens_to_ids(tokenized)
    
    return ids

In [14]:
def generate_sample_df(sarcastic, non_sarcastic, ratio, n_samples, max_seq_length):
    
    """
    Returns: Spark df of equal label distribution with text 
    tokenized. Each generated df is to be iterator over multiple 
    times during training
    """
    
    number = 0
    while number < n_samples:
        
        non_sarc_samp = non_sarcastic.sample(ratio) # making label dist equal
        
        # combine sampled non_sarcastic and whole sarcastic
        sample_df = sarcastic.union(non_sarc_samp)
        
        # tokenize context column via spark udf
        tokenize_sample_udf = F.udf(tokenize_sample, ArrayType(IntegerType()))
        sample_df = sample_df.withColumn("raw_tokens", tokenize_sample_udf(sample_df.context))
        # keep only the first 'max_seq_length' tokens
        sample_df = sample_df.withColumn("tokens", F.slice('raw_tokens',1, max_seq_length))
        
        # drop context and raw_tokens columns
        sample_df = sample_df.drop("context")
        sample_df = sample_df.drop("raw_tokens")
        
        # yield one call at a time
        yield sample_df
        number += 1

In [15]:
class BertLayer(tf.keras.layers.Layer):
    
    def __init__(self,
        n_fine_tune_layers=2,
        output_type="sequence_output",
        bert_path="https://tfhub.dev/tensorflow/bert_en_cased_L-12_H-768_A-12/1",
        **kwargs):
        
        self.n_fine_tune_layers = n_fine_tune_layers
        self.trainable = True
        self.output_size = 768
        self.output_type = output_type
        self.bert_path = bert_path
        
        if self.output_type not in ["sequence_output", "pooled_output"]:
            raise NameError("Undefined pooling type (must be either sequence_output or pooled_output, but is {self.output_type}")

        super(BertLayer, self).__init__(**kwargs)
        
    def layer_number(self, var):
    
        """
        Get the layer number corresponding to the 
        given variable
        """
        m = re.search(r'/layer_(\d+)/', var)
        
        if m:
            return int(m.group(1))
        else:
            return None


    def build(self, input_shape):

        """
        Creates the variables of the layer (optional, for subclass implementers).
 |      
 |      This is a method that implementers of subclasses of `Layer` or `Model`
 |      can override if they need a state-creation step in-between
 |      layer instantiation and layer call.
 |      
 |      This is typically used to create the weights of `Layer` subclasses.
 
        Called once from `__call__`, when we know the shapes of input and `dtype`.
        """
        
        self.bert_model = thub.KerasLayer(self.bert_path, self.trainable)

        # extract all trainable variables from the model
        trainable_vars = self.bert_model.trainable_variables
        
        if self.output_type == "pooled_output":
            
            # removing '/cls/' layers (there don't appear to be any) 
            trainable_vars = [var.name for var in trainable_vars if not "/cls/" in var.name]
            
        elif self.output_type == "sequence_output":
            
            # removing '/cls/' (there don't appear to be any) and '/pooler_transform/' layers 
            trainable_vars = [var.name for var in trainable_vars if not "/cls/" in var.name
                              and not "/pooler_transform" in var.name] 
            
        ### select how many layers to fine tune starting from top-most layer ###
        
        # outputs a list of either Nonetype or layer number
        layer_numbers = list(map(self.layer_number, trainable_vars))
        # returns the total number of layers in pre-trained model (note: layers are zero-indexed)
        n_total_layers = max(n for n in layer_numbers if n is not None) + 1 
        # finally, create list of just layers to be trained
        trainable_vars = [var for n, var in zip(layer_numbers, trainable_vars) if n is not None and n >= n_total_layers - self.n_fine_tune_layers]    
        
        # add variables NOT to be trained to _non_trainable_weights and 
        # remove them from _trainable_weights
        # note: underscore is necessary for accessing the writable object
        for var in self.bert_model.variables:

            if var.name not in trainable_vars and "Variable:0" not in var.name:
                    
                    # add non_trainable weights to _non_trainable_weights
                    self.bert_model._non_trainable_weights.append(var)
                    
                    ### due to boolean-related issues with nparrays, need to take different approach to removing weights ###
                    # pull out index in _trainable_weights corresponding to the var.name that needs to be removed
                    ix = [(i,j.name) for i,j in enumerate(self.bert_model._trainable_weights) if j.name == var.name][0][0]
                    # pop it off
                    self.bert_model._trainable_weights.pop(ix)

        super(BertLayer, self).build(input_shape)
        
    def call(self, inputs): 
        
        """
        Called in `__call__` after making sure `build()` has been called
 |      once. Should actually perform the logic of applying the layer to the
 |      input tensors (which should be passed in as the first argument).
        """
        # takes in list of input tensors and casts them in Keras
        inputs = [K.cast(x, dtype="int32") for x in inputs]
        
        #bert-for-tf2 returns (pooled_output,sequence_output) when called
        if self.output_type == "pooled_output":
            
            output = self.bert_model(inputs)[0]
                
        elif self.output_type == "sequence_output":
            
            output = self.bert_model(inputs)[1]
            
        return output
    
    def compute_output_shape(self, input_shape):
        
        return (input_shape[0], self.output_size)

       
        

In [29]:
class training(object):
    
    def __init__(self,
                max_seq_length=128,
                n_epochs=50,
                batch_size=13,
                patience=5,
                validation_split=0.1,
                checkpoint_dir=os.getcwd() + "/checkpoints",
                saved_model_dir=os.getcwd() + "/saved_models",
                pad_by_batch=False):
    
        self.max_seq_length = max_seq_length
        self.n_epochs = n_epochs
        self.batch_size = batch_size
        self.patience = patience
        self.validation_split = validation_split
        self.checkpoint_dir = checkpoint_dir
        self.saved_model_dir = saved_model_dir
        self.pad_by_batch = pad_by_batch

    def build_model(self, gpu=True): 
    
        """
        Defines input shapes of bert input tensors,
        """
        input_token_ids = tf.keras.layers.Input(shape=(self.max_seq_length,), dtype=tf.int32,
                                       name="input_word_ids")
        input_mask = tf.keras.layers.Input(shape=(self.max_seq_length,), dtype=tf.int32,
                                   name="input_mask")
        segment_ids = tf.keras.layers.Input(shape=(self.max_seq_length,), dtype=tf.int32,
                                    name="segment_ids")
        bert_inputs = [input_token_ids, input_mask, segment_ids]
    
        if gpu == True:
            
            """
            This will create a MirroredStrategy instance which will use 
            all the GPUs that are visible to TensorFlow, and use NCCL as 
            the cross device communication.
            """
            # tf graph
            bert_output = BertLayer()(bert_inputs)
            dense_out = tf.keras.layers.Dense(self.max_seq_length, activation='relu')(bert_output)
            dense_out = tf.keras.layers.Dropout(0.5)(dense_out)
            logits = tf.keras.layers.Dense(1, activation='sigmoid')(dense_out)
            
            mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"],
                                                               cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
            with mirrored_strategy.scope():
                
                # define, compile model
                model = tf.keras.models.Model(inputs=bert_inputs, outputs=logits)
                model.compile(loss='binary_crossentropy', 
                      optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), 
                      metrics=['accuracy'])
    
                model.summary()
        
        elif gpu == False:
            
            # tf graph
            bert_output = BertLayer()(bert_inputs)
            dense_out = tf.keras.layers.Dense(self.max_seq_length, activation='relu')(bert_output)
            dense_out = tf.keras.layers.Dropout(0.5)(dense_out)
            logits = tf.keras.layers.Dense(1, activation='sigmoid')(dense_out)
    
            # define, compile model
            model = tf.keras.models.Model(inputs=bert_inputs, outputs=logits)
            model.compile(loss='binary_crossentropy', 
                    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), 
                    metrics=['accuracy'])
    
            model.summary()
    
        return model
    
    def train_model(self, model, train_inputs, train_labels):
        
        """
        Initiates training process using self.build_model output as input.
        If self.pad_by_batch == False, input should be the entire epoch of 
        training inputs (including masks) and labels, as numpy arrays.
        """
    
        checkpoints = tf.keras.callbacks.ModelCheckpoint(self.checkpoint_dir, verbose=1, 
                                                     save_best_only=False,
                                                     save_weights_only=True, mode='auto', 
                                                     save_freq='epoch')
    
        early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=self.patience)
    
        if self.pad_by_batch == False:
            
            train_input_ids, train_input_masks, train_segment_ids = train_inputs
        
            model.fit([train_input_ids, train_input_masks, train_segment_ids], 
                  train_labels,
                  validation_split = self.validation_split,
                  epochs = self.n_epochs,
                  batch_size = self.batch_size,
                  callbacks = [checkpoints, early_stopping])
    
            model.save(self.saved_model_dir+'/my_model.h5')



In [23]:
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"],
                                                               cross_device_ops=tf.distribute.NcclAllReduce())

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


In [17]:
"""

imported = tf.saved_model.load(saved_model_dir+'/my_model.h5')

# Let's take a look to see how many layers are in the base model
print("Number of layers in the base model: ", len(base_model.layers))

# Fine-tune from this layer onwards
fine_tune_at = 100

# Freeze all the layers before the `fine_tune_at` layer
for layer in base_model.layers[:fine_tune_at]:
  layer.trainable =  False

"""

'\n\nimported = tf.saved_model.load(saved_model_dir+\'/my_model.h5\')\n\n# Let\'s take a look to see how many layers are in the base model\nprint("Number of layers in the base model: ", len(base_model.layers))\n\n# Fine-tune from this layer onwards\nfine_tune_at = 100\n\n# Freeze all the layers before the `fine_tune_at` layer\nfor layer in base_model.layers[:fine_tune_at]:\n  layer.trainable =  False\n\n'

In [18]:
# Initialize BERT model and tokenizer

%time bert_layer, tokenizer = model_utils.init_bert()

# Initialize Spark context

%time sc, spark = model_utils.init_spark()


CPU times: user 10.4 s, sys: 1.31 s, total: 11.7 s
Wall time: 11.8 s
CPU times: user 36.9 ms, sys: 28.5 ms, total: 65.3 ms
Wall time: 14.5 s


In [19]:
# Read in sarcastic samples, non-sarcastic samples, and the ratio between the two

%time sarcastic, non_sarcastic, ratio = model_utils.load_data(spark, \
                                                              bucket_name="sarc-bucket-5", \
                                                              dataset="politics")

CPU times: user 23.3 ms, sys: 31.3 ms, total: 54.6 ms
Wall time: 57.6 s


In [20]:
# Initialize sample_df generator

%time sample_generator = generate_sample_df(sarcastic, non_sarcastic, ratio, n_samples, max_seq_length)




CPU times: user 15 µs, sys: 3 µs, total: 18 µs
Wall time: 22.4 µs


In [30]:
for i in range(n_samples):

    # Output first smaple

    %time sample_df = next(sample_generator)

    # Sample df distribution 

    sample_df.groupBy('label').count().show()

    # Initialize training class object and build Bert layer

    t = training()
    %time model = t.build_model()

    # Produce padded tokens, input masks, and segment ids as nparrays

    %time padded_tokens, train_labels = model_utils.pad(sample_df, t.batch_size, pad_by_batch=False)
    %time input_mask = model_utils.input_mask(padded_tokens)
    %time segment_id = model_utils.segment_id(padded_tokens)
    train_inputs = [padded_tokens, input_mask, segment_id]

    # Execute training

    t.train_model(model, train_inputs, train_labels)

CPU times: user 289 ms, sys: 0 ns, total: 289 ms
Wall time: 349 ms
+-----+-----+
|label|count|
+-----+-----+
|    1|27265|
|    0|27442|
+-----+-----+

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


RuntimeError: /job:localhost/replica:0/task:0/device:GPU:0 unknown device.

CPU times: user 16 s, sys: 2.08 s, total: 18 s
Wall time: 56.2 s
CPU times: user 25.5 ms, sys: 17 ms, total: 42.5 ms
Wall time: 20.9 ms
CPU times: user 27 ms, sys: 16.1 ms, total: 43.2 ms
Wall time: 21.7 ms


NameError: name 'model' is not defined

In [7]:
# Stop Spark context

sc.stop()

In [32]:
%time sample_df = next(sample_generator)

# Sample df distribution 

sample_df.groupBy('label').count().show()

# Initialize training class object and build Bert layer

t = training()

# Produce padded tokens, input masks, and segment ids as nparrays

%time padded_tokens, train_labels = model_utils.pad(sample_df, t.batch_size, pad_by_batch=False)
%time input_mask = model_utils.input_mask(padded_tokens)
%time segment_id = model_utils.segment_id(padded_tokens)
train_inputs = [padded_tokens, input_mask, segment_id]

# Execute training

CPU times: user 282 ms, sys: 0 ns, total: 282 ms
Wall time: 314 ms
+-----+-----+
|label|count|
+-----+-----+
|    1|27265|
|    0|27294|
+-----+-----+

CPU times: user 15.3 s, sys: 1.85 s, total: 17.1 s
Wall time: 53.1 s
CPU times: user 22.1 ms, sys: 21.5 ms, total: 43.6 ms
Wall time: 21.5 ms
CPU times: user 32.2 ms, sys: 12.4 ms, total: 44.6 ms
Wall time: 22.2 ms


In [34]:
 # tf graph
bert_output = BertLayer()(train_inputs)
dense_out = tf.keras.layers.Dense(self.max_seq_length, activation='relu')(bert_output)
dense_out = tf.keras.layers.Dropout(0.5)(dense_out)
logits = tf.keras.layers.Dense(1, activation='sigmoid')(dense_out)
            
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"],
                                                               cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())


ResourceExhaustedError:  OOM when allocating tensor with shape[6983552,768] and type float on /job:localhost/replica:0/task:0/device:CPU:0 by allocator cpu
	 [[{{node StatefulPartitionedCall/StatefulPartitionedCall/bert_model/StatefulPartitionedCall/word_embeddings/Gather}}]]
Hint: If you want to see a list of allocated tensors when OOM happens, add report_tensor_allocations_upon_oom to RunOptions for current allocation info.
 [Op:__inference_restored_function_body_118569]

Function call stack:
restored_function_body


In [None]:
with mirrored_strategy.scope():
                
    # define, compile model
    model = tf.keras.models.Model(inputs=bert_inputs, outputs=logits)
    model.compile(loss='binary_crossentropy', 
                      optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), 
                      metrics=['accuracy'])
    
    model.summary()

In [37]:
train_inputs[0].shape

(54559, 128)

In [4]:
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


In [27]:
mirrored_strategy = tf.distribute.MirroredStrategy()





INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In [28]:
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"],cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
