In [36]:
import os
os.environ["KERAS_BACKEND"] = "plaidml.keras.backend"

In [37]:
import numpy as np # linear algebra
import pandas as pd # data processing
import sparknlp # nlp processing
from sklearn.model_selection import train_test_split # splitting data

import matplotlib.pyplot as plt # visualisation
import seaborn as sns # visualisation 
%matplotlib inline

In [2]:
spark = sparknlp.start()

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

Spark NLP version:  2.5.0
Apache Spark version:  2.4.5


In [3]:
from pyspark.sql import SQLContext

sql = SQLContext(spark)

In [4]:
data = pd.read_csv("dataset/clean_data.csv")

In [5]:
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

### Preprocessing + Bert Embedding

In [8]:
def preprocess_partial(column):
    document_assembler = DocumentAssembler() \
        .setInputCol(column) \
        .setOutputCol(column+"_document")\
        .setCleanupMode("shrink") 
    
    sentence_detector = SentenceDetector() \
        .setInputCols([column+"_document"]) \
        .setOutputCol(column+"_sentence") \
        .setUseAbbreviations(True)
    
    tokenizer = Tokenizer() \
        .setInputCols([column+"_sentence"]) \
        .setOutputCol(column+"_token")
    
    spell_checker = NorvigSweetingApproach() \
        .setInputCols([column+"_token"]) \
        .setOutputCol(column+"_checked") \
        .setDictionary("./spell/coca2017.txt", "[a-zA-Z]+")
    
    normalizer = Normalizer() \
        .setInputCols([column+"_checked"]) \
        .setOutputCol(column+"_normalized")
    
    lemma = LemmatizerModel.pretrained('lemma_antbnc') \
        .setInputCols([column+"_normalized"]) \
        .setOutputCol(column+"_lemma")
   
    stopwords_cleaner = StopWordsCleaner()\
        .setInputCols(column+"_lemma")\
        .setOutputCol(column+"_cleanTokens")\
        .setCaseSensitive(False)
    
    finisher = Finisher() \
        .setInputCols([column+"_cleanTokens"]) \
        .setOutputCols([column+"_finished"])\
        .setIncludeMetadata(False)\
        .setCleanAnnotations(True)

    return [document_assembler, sentence_detector, tokenizer, spell_checker, normalizer, lemma, stopwords_cleaner, finisher]

def preprocessing_pipeline():
     
    q1_stages = preprocess_partial("question1")
    
    q2_stages = preprocess_partial("question2")
    
    label_stringIdx = StringIndexer(inputCol = "is_duplicate", outputCol = "label")
    
    pipeline = Pipeline(stages=q1_stages+q2_stages+[label_stringIdx])
    
    return pipeline

In [9]:
nlp_pipeline = preprocessing_pipeline()

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [10]:
def preprocess_data(data):
    for ix, df_part in enumerate(np.array_split(data, 202)):
        chunk = sql.createDataFrame(df_part) 
        nlp_model = nlp_pipeline.fit(chunk)
        result = nlp_model.transform(chunk)

        if ix == 0:
            result_df = result
        else:
            result_df = result_df.unionAll(result)
    
    return result_df

In [12]:
processed_df = preprocess_data(data)

In [13]:
processed_df.show(1)

+---+----+----+--------------------+--------------------+------------+--------------------+--------------------+-----+
| id|qid1|qid2|           question1|           question2|is_duplicate|  question1_finished|  question2_finished|label|
+---+----+----+--------------------+--------------------+------------+--------------------+--------------------+-----+
|  0|   1|   2|What is the step ...|What is the step ...|           0|[step, step, guid...|[step, step, guid...|  0.0|
+---+----+----+--------------------+--------------------+------------+--------------------+--------------------+-----+
only showing top 1 row



### Spark ML ClassifierDL

In [14]:
from pyspark.ml import feature as spark_ft

In [None]:
def text2vec(col):
    return spark_ft.Word2Vec(vectorSize=50, minCount=2, seed=123, 
                             inputCol=f'question{col}_finished', outputCol=f'q{col}_text_vec', 
                             windowSize=5, maxSentenceLength=250)

text2vec1 = text2vec(1)
text2vec2 = text2vec(2)

# assembler = spark_ft.VectorAssembler(inputCols=['q1_text_vec', 'q2_text_vec'], outputCol='features')

classsifierdl = ClassifierDLApproach()\
  .setInputCols(['q1_text_vec', 'q2_text_vec'])\
  .setOutputCol("preds")\
  .setLabelColumn("label")\
  .setMaxEpochs(20)\
  .setEnableOutputLogs(True)

feature_pipeline = Pipeline(stages=[text2vec1, text2vec2, classsifierdl])
feature_model = feature_pipeline.fit(processed_df)

In [None]:
!cat ~/annotator_logs/ClassifierDLApproach_d4a8d8ae15c4.log

In [17]:
featurized_df = feature_model.transform(processed_df).persist()

In [19]:
featurized_clean_df = featurized_df.select('features', 'label')
featurized_clean_df.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-0.3750478071825...|  0.0|
|[-0.1569854863919...|  0.0|
|[-0.2768606580793...|  0.0|
|[0.10770221551259...|  0.0|
|[0.17948742415755...|  0.0|
|[0.00731686313520...|  1.0|
|[-0.1786983711645...|  0.0|
|[-0.1171621502144...|  1.0|
|[-0.0333446813747...|  0.0|
|[-0.1219908632338...|  0.0|
|[-0.1354868883666...|  0.0|
|[-0.7762437500059...|  1.0|
|[-0.3195876628160...|  1.0|
|[0.02170692756772...|  1.0|
|[-0.0161319741358...|  0.0|
|[-0.0455943652325...|  1.0|
|[0.17729056905955...|  1.0|
|[-0.0264204561710...|  0.0|
|[-0.3555452488362...|  1.0|
|[-0.5042165249586...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [None]:
text2vec1 = text2vec(1)
text2vec2 = text2vec(2)
assembler = spark_ft.VectorAssembler(inputCols=['q1_text_vec', 'q2_text_vec'], outputCol='features')
feature_pipeline = Pipeline(stages=[text2vec1, text2vec2, assembler])
feature_model = feature_pipeline.fit(processed_df)

In [None]:
featurized_df = feature_model.transform(processed_df).persist()

In [40]:
train_data, test_data = featurized_df.randomSplit([.8, .2], seed=1234)

In [41]:
train_data.show()

+---+----+----+---------------------+--------------------+------------+----------------------+--------------------+-----+--------------------+--------------------+--------------------+
| id|qid1|qid2|            question1|           question2|is_duplicate|    question1_finished|  question2_finished|label|         q1_text_vec|         q2_text_vec|            features|
+---+----+----+---------------------+--------------------+------------+----------------------+--------------------+-----+--------------------+--------------------+--------------------+
|  3|   7|   8| Why am I mentally...|Find the remainde...|           0|  [mentally, lonely...|[wind, remainder,...|  0.0|[0.10770221551259...|[0.08770019956864...|[0.10770221551259...|
|  4|   9|  10| Which one dissolv...|Which fish would ...|           0|  [one, dissolve, w...|[fish, survive, s...|  0.0|[0.17948742415755...|[0.21307177469134...|[0.17948742415755...|
|  5|  11|  12| Astrology: I am a...|I'm a triple Capr...|           1|  [a

In [38]:
from keras.layers import Layer, RNN

## custom keras layer to implement the attention mechanism (with trainable weights) for the hierarchical attention model. 
# Implementation based on word and sentence attention layers described in Yang et al. and keras custom layer example
class Attention_Layer(Layer):

    def __init__(self, output_dim):
        self.output_dim = output_dim
        super(Attention_Layer, self).__init__()

    def build(self, input_shape):
        # Create the trainable weight variables for this layer.
        self.W = self.add_weight(name='W', 
                                      shape=(input_shape[-1], self.output_dim),
                                      initializer='uniform',
                                      trainable=True)
        self.b = self.add_weight(name='b', 
                                      shape=(self.output_dim,),
                                      initializer='uniform',
                                      trainable=True)
        self.u = self.add_weight(name='u', 
                                      shape=(self.output_dim,1),
                                      initializer='uniform',
                                      trainable=True)
        super(Attention_Layer, self).build(input_shape)  # Be sure to call this at the end

    def call(self, h_it):        
        u_it = K.tanh(K.bias_add(K.dot(h_it, self.W), self.b))
        att_weights = K.dot(u_it, self.u)
        exp_weights = K.exp(att_weights)
        sum_weights = K.sum(exp_weights, axis=1, keepdims=True)
        alpha_it = exp_weights/sum_weights
        return K.sum(h_it*alpha_it, axis=1)

    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[-1])

In [39]:
def build_hier_model(vectors, optimizer, learn_rate, dropout_rate1, dropout_rate2, max_length=50, num_hidden=200, num_classes=1, 
                projected_dim=200):
    K.clear_session()
    
    # input    
    model_input = layers.Input(shape=(2, max_length), dtype='int32')
    
    # embeddings (projected)
    embed = create_embedding(vectors, max_length, projected_dim)
    
    # step 1: word encoder
    word_sequence_input = layers.Input(shape=(max_length,), dtype='int32')
    h_w = layers.Bidirectional(layers.GRU(num_hidden, dropout=dropout_rate1, return_sequences=True))(embed(word_sequence_input))
    
    # step 2: word attention
    s_w = Attention_Layer(num_hidden)(h_w)
    word_encode_attend = Model(word_sequence_input, s_w)
    
    # step 3: question encoder
    q_encode_attend = layers.TimeDistributed(word_encode_attend)(model_input)
    h = layers.Bidirectional(layers.GRU(num_hidden, dropout=dropout_rate2, return_sequences=True))(q_encode_attend)
    
    # step 4: question attention
    v = Attention_Layer(num_hidden)(h)
    
    # step 5: final classification
    out = layers.Dense(num_classes, activation='sigmoid', use_bias=True)(v)
    
    model = Model(model_input, out)
    
    if optimizer == 'sgd':
        opt = SGD(lr=learn_rate)
    elif optimizer == 'adam':
        opt = Adam(lr=learn_rate)
    elif optimizer == 'rmsprop':
        opt = RMSprop(lr=learn_rate)
    else:
        opt = Nadam(lr=learn_rate)
    
    model.compile(optimizer=opt,
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model

In [44]:
def build_data(df, chunks=10):
    x_1 = []
    x_2 = []
    y_train = []

    row_count = df.count()
    i = 0
    
    chunks = df.randomSplit(weights=[1/chunks] * chunks)

    for chunk in chunks:
        rows = chunk.collect()
        for row in rows:
            if i % 100000 == 0:
                print('row {} / {} ({:.1f} %)'.format(i, row_count, 100 * i / row_count))
            label = row['label']
            x_1.append(row['question1_finished'])
            x_2.append(row['question2_finished'])
            y_train.append(label)
            i += 1

    #x_train = np.array([np.array(x_train_1), np.array(x_train_2)])
    y_train = np.array(y_train)
    return x_1, x_2, y_train

In [45]:
q1, q2, yt = build_data(train_data)

row 0 / 323543 (0.0 %)
row 100000 / 323543 (30.9 %)
row 200000 / 323543 (61.8 %)
row 300000 / 323543 (92.7 %)


In [None]:
def build_modelBase(num_units, dropout_rate=0.2, activation='relu'):
    return models.Sequential([
        layers.Dense(num_units, activation=activation, use_bias=True),
        layers.Dropout(dropout_rate),
        layers.Dense(num_units, activation=activation, use_bias=True),
        layers.Dropout(dropout_rate)
    ])

def normalizer(axis):
    def _normalize(att_weights):
        exp_weights = K.exp(att_weights)
        sum_weights = K.sum(exp_weights, axis=axis, keepdims=True)
        return exp_weights/sum_weights
    return _normalize

# function to sum a vector
def sum_word(x):
    return K.sum(x, axis=1) 

def build_model(questions, max_length=250, num_hidden=200, dropout_rate=0.2, learn_rate=0.0001):
  
    # clear Keras session to free up GPU memory
    K.clear_session()
    
    # input_a -> question1
    # input_b -> question2
    input_a = layers.Input(shape=(768,1))
    input_b = layers.Input(shape=(768,1))
    
    ## step 2: encode
    # compute attention weights
    Q1 = build_modelBase(num_hidden, dropout_rate=dropout_rate)
    
    ## step 3: attend 
    # combine the soft-aligned vectors with the corresponding word vectors 
    Q2 = build_modelBase(num_hidden, dropout_rate=dropout_rate)   
    
    a = Q1(input_a)
    b = Q2(input_b)
    
    att = layers.Attention()([a, b])
    
    # normalize the attention weights
    norm_weights_a = layers.Lambda(normalizer(1))(att)
    norm_weights_b = layers.Lambda(normalizer(2))(att)
    
    # compute version of question a that is soft-aligned with every word of b
    alpha = layers.dot([norm_weights_a, a], axes=1)
    # compute version of question b that is soft-aligned with every word of a
    beta  = layers.dot([norm_weights_b, b], axes=1)
    
    comp1 = layers.concatenate([a, beta])
    comp2 = layers.concatenate([b, alpha])
    
    v1 = layers.TimeDistributed(G)(comp1)
    v2 = layers.TimeDistributed(G)(comp2)
    
    # and reduce the vectors computed above to a single vector per question
    v1_sum = layers.Lambda(sum_word)(v1)
    v2_sum = layers.Lambda(sum_word)(v2)
    
    concat = layers.concatenate([v1_sum, v2_sum])
        
    ## step 4: predict 
    H = build_modelBase(num_hidden, dropout_rate=dropout_rate3)
    out = H(concat)
    out = layers.Dense(num_classes, activation='sigmoid', use_bias=True)(out)
    
    # optimizer for gradient descent
    model = Model(model_input, out)
    
    opt = RMSprop(lr=learn_rate)
   
    
    model.compile(optimizer=opt,
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model

In [155]:
from sklearn.metrics import classification_report, accuracy_score

df = nlp_model_bert.transform(df_limited).select('label', 'question1', 'question2', 'class.result').toPandas()

In [164]:
print(classification_report(df.label, df.result))
print(accuracy_score(df.label, df.result))

              precision    recall  f1-score   support

         0.0       0.62      1.00      0.77      1250
         1.0       0.00      0.00      0.00       741
         2.0       0.00      0.00      0.00         1
         3.0       0.00      0.00      0.00         1
         4.0       0.00      0.00      0.00         1
         5.0       0.00      0.00      0.00         1
         6.0       0.00      0.00      0.00         1
         7.0       0.00      0.00      0.00         1
         8.0       0.00      0.00      0.00         1
         9.0       0.00      0.00      0.00         1
        10.0       0.00      0.00      0.00         1

    accuracy                           0.62      2000
   macro avg       0.06      0.09      0.07      2000
weighted avg       0.39      0.62      0.48      2000

0.625


  _warn_prf(average, modifier, msg_start, len(result))


In [169]:
df[df.label == 10]

Unnamed: 0,label,question1,question2,result
813,10.0,"""Is there a """"blind trust"""" provision for Amer...","and how is it enforced?""",0.0
