## References

HW2 Starter Code

https://spark.apache.org/docs/latest/ml-features.html

https://spark.apache.org/docs/latest/ml-pipeline.html

https://spark.apache.org/docs/latest/ml-classification-regression.html

https://stackoverflow.com/questions/38839924/how-to-combine-n-grams-into-one-vocabulary-in-spark

https://nlp.johnsnowlabs.com/docs/en/transformers#bertsentenceembeddings

https://nlp.johnsnowlabs.com/docs/en/annotators

https://nlp.johnsnowlabs.com/docs/en/training


In [19]:
import pandas as pd
import numpy as np
import json
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline

In [52]:
path_name = 'gs://eecs6893-hw2-cluster/final-project/training/sentiment140.csv'
df = pd.read_csv(path_name)

In [57]:
import re 

df = df.iloc[0:120000, :]

def clean_text(text):
    text = text.strip("b'").strip('b"')
    text = text.lower()
    text = text.replace("'", "")
    text = re.sub('@(\w+)', '', text)
    text = re.sub('\W+', ' ', text)
    text = text.replace('amp', 'and').strip()
    return text

df['CleanText'] = df['Text'].map(clean_text)
df.loc[df['Label'] == 0, 'Label'] = 0
df.loc[df['Label'] == 4, 'Label'] = 1
df.head(10)

Unnamed: 0.1,Unnamed: 0,Text,Label,CleanText
0,0,"b""i'm 10x cooler than all of you! """,4,im 10x cooler than all of you
1,1,b'O.kk? Thats weird I cant stop following peop...,0,o kk thats weird i cant stop following people ...
2,2,b'what a beautiful day not to got to my first ...,4,what a beautiful day not to got to my first class
3,3,"b"".@HildyGottlieb &amp; I was just saying to M...",4,and i was just saying to mahaal yesterday ever...
4,4,b'kinda sad and confused why do guys do this?',0,kinda sad and confused why do guys do this
5,5,b'@Real_DavidCook YES &amp; YES ',4,yes and yes
6,6,"b""@GDGOfficial But it's another beautiful day ...",4,but its another beautiful day here in europe y...
7,7,b'Working through hundreds of assignments ',0,working through hundreds of assignments
8,8,b'driving with the moonroof and windows open i...,0,driving with the moonroof and windows open is ...
9,9,"b""@scott_mills Gutted! I worked for the fringe...",0,gutted i worked for the fringe last year wont ...


In [58]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(df, test_size=0.2, random_state=123)

In [59]:
train.count()

Unnamed: 0    96000
Text          96000
Label         96000
CleanText     96000
dtype: int64

In [43]:
test.count()

Unnamed: 0    24000
Text          24000
Label         24000
CleanText     24000
dtype: int64

In [44]:
train['Label'].value_counts()

4    48135
0    47865
Name: Label, dtype: int64

In [45]:
test['Label'].value_counts()

4    12033
0    11967
Name: Label, dtype: int64

In [60]:
path_name = 'gs://eecs6893-hw2-cluster/final-project/training/sentiment140.csv'

spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.driver.memory","16G")\
    .config("spark.executor.memory","16G")\
    .config("spark.driver.maxResultSize", "8G") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.3.2")\
    .getOrCreate()

train_data = spark.createDataFrame(train)
test_data = spark.createDataFrame(test)
train_data.show(10)

+----------+--------------------+-----+--------------------+
|Unnamed: 0|                Text|Label|           CleanText|
+----------+--------------------+-----+--------------------+
|     68758|b'Today is a good...|    4| today is a good day|
|     51808|b'@AundreaFimbres...|    4|hi tell your mom ...|
|     75098|b'Tweet tweet. Cl...|    4|tweet tweet cla a...|
|    102652|b'@ilovecpstyle a...|    4|arizona has a gre...|
|     95952|b'@Gatchy Ewwwwww...|    4|ewwwwww and yes y...|
|    118442|b"@alisonswartz I...|    0|im right there wi...|
|     98087|b'More shell scri...|    0|more shell scripting|
|     77142|b'Finaly got some...|    4|finaly got some d...|
|     13722|b'pink lemonade h...|    0|pink lemonade hat...|
|     57572|b'@number58 Sadly...|    0|sadly those are o...|
+----------+--------------------+-----+--------------------+
only showing top 10 rows



21/11/19 04:02:05 WARN org.apache.spark.scheduler.TaskSetManager: Stage 94 contains a task of very large size (7413 KiB). The maximum recommended task size is 1000 KiB.


In [62]:
train_data.printSchema()

root
 |-- Unnamed: 0: long (nullable = true)
 |-- Text: string (nullable = true)
 |-- Label: long (nullable = true)
 |-- CleanText: string (nullable = true)



In [63]:
from pyspark.sql.types import IntegerType ,BooleanType, DateType, DoubleType

train_data = train_data.withColumn('IntLabel', train_data['Label'].cast(IntegerType()))
test_data = test_data.withColumn('IntLabel', test_data['Label'].cast(IntegerType()))

In [64]:
from pyspark.sql.functions import *
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import CountVectorizer
 
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover
 
from pyspark.ml.classification import LogisticRegression, NaiveBayes, GBTClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [19]:
def get_ngrams(input_col='Text', output_col='features', n=3):
        
    tokenizer = [Tokenizer(inputCol=input_col, outputCol="words")]
    
    stop_words = [StopWordsRemover(inputCol='words', outputCol='filterWords')]
        
    ngrams = [
        NGram(n=i, inputCol="filterWords", outputCol="%s_ngrams" % str(i))
        for i in range(1, n+1)
    ]
 
    cv = [CountVectorizer(vocabSize=100e3, inputCol="%s_ngrams" % str(i),
            outputCol="%s_tf" % str(i))
        for i in range(1, n+1)]
    
    idf = [IDF(inputCol="%s_tf" % str(i), outputCol="%s_tfidf" % str(i), minDocFreq=100) for i in range(1, n+1)]
 
    assembler = [VectorAssembler(
        inputCols=["%s_tfidf" % str(i) for i in range(1, n+1)],
        outputCol=output_col)]
    
    stages = tokenizer + stop_words + ngrams + cv + idf + assembler
    
    return stages


In [35]:
def get_ml_model(model_type, label_col='IntLabel', features_col='features'):
    
    if model_type == 'logistic':
        model = LogisticRegression(labelCol=label_col, featuresCol=features_col, maxIter=10, regParam=0.01, elasticNetParam=0.8)
    if model_type == 'gbt':
        model = GBTClassifier(labelCol=label_col, featuresCol=features_col, maxIter=10)
    if model_type == 'svm':
        model = LinearSVC(labelCol=label_col, featuresCol=features_col, maxIter=10, regParam=0.01)
    if model_type == 'nb':
        model = NaiveBayes(labelCol=label_col, featuresCol=features_col, smoothing=1.0)
    
    label_index = StringIndexer(inputCol="Label", outputCol="StringLabel")
    stages = [model]
    
    return stages


In [26]:
def train_baseline_model(train_data, model_type, n):
    
    feature_pipeline = get_ngrams(n=n)
    model_pipeline = get_ml_model(model_type)
    total_pipeline = Pipeline(stages=feature_pipeline + model_pipeline)
    total_pipeline = total_pipeline.fit(train_data)
    
    return total_pipeline

In [27]:
def test_baseline_model(model, test_data):

    predictions = model.transform(test_data)
    preds = predictions.select('IntLabel', "prediction")
    preds = preds.withColumn('PredLabel', preds['prediction'].cast(DoubleType()))
    evaluator = MulticlassClassificationEvaluator(labelCol="IntLabel", predictionCol="PredLabel", metricName="accuracy")
    test_acc = evaluator.evaluate(preds)
    
    return test_acc

In [56]:
import time
import datetime as dt

baseline_models = ['logistic', 'gbt', 'svm']
n_grams = [1]
results = []

spark.sparkContext.setLogLevel("ERROR")

for baseline_model in baseline_models:
    
    for n_gram in n_grams:
    
        model = train_baseline_model(train_data, baseline_model, n_gram)
        t1 = time.time()
        ts = dt.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
        path_name = 'gs://eecs6893-hw2-cluster/final-project/training/'
        model_name = baseline_model.upper() + '-NGRAM=' + str(n_gram) + '-' + ts
        model.save(path_name + model_name)
        print(model_name)
        t2 = time.time()
        te = (t2-t1)
        test_acc = test_baseline_model(model, test_data)
        result = (baseline_model.upper(), n_gram, te, test_acc)
        results.append(result)
        print("Test Set Accuracy = " + str(test_acc))

results_df = pd.DataFrame(results, columns=['Model', 'Ngrams', 'Train Time', 'Test Accuracy'])
results_df.to_csv(path_name + 'perf_results.csv')

21/11/18 19:11:25 WARN org.apache.spark.scheduler.TaskSetManager: Stage 278 contains a task of very large size (6130 KiB). The maximum recommended task size is 1000 KiB.
21/11/18 19:11:30 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1253.6 KiB
21/11/18 19:11:30 WARN org.apache.spark.scheduler.TaskSetManager: Stage 282 contains a task of very large size (6130 KiB). The maximum recommended task size is 1000 KiB.
21/11/18 19:11:41 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 8.2 MiB
21/11/18 19:11:41 WARN org.apache.spark.scheduler.TaskSetManager: Stage 283 contains a task of very large size (6130 KiB). The maximum recommended task size is 1000 KiB.
21/11/18 19:11:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 8.2 MiB
21/11/18 19:11:45 WARN org.apache.spark.scheduler.TaskSetManager: Stage 284 contains a task of very large size (6130 KiB). The maximum recommended t

LOGISTIC-NGRAM=1-2021-11-18-19-11-57


21/11/18 19:12:13 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 8.2 MiB
21/11/18 19:12:13 WARN org.apache.spark.scheduler.TaskSetManager: Stage 316 contains a task of very large size (1578 KiB). The maximum recommended task size is 1000 KiB.

Test Set Accuracy = 0.68305


                                                                                

In [57]:
results_df.head(1)

Unnamed: 0,Model,Ngrams,Train Time,Test Accuracy
0,LOGISTIC,1,15.276312,0.68305


In [62]:
weights = model.stages[-1].coefficients.toArray()
vocab = np.array(model.stages[3].vocabulary)
negative_indices = np.argsort(weights)[:10]
negative_weights = np.sort(weights)[:10]
negative_words = vocab[negative_indices]
df1 = pd.DataFrame(zip(negative_words, negative_weights), columns=['Word', 'Weight'])
df1

Unnamed: 0,Word,Weight
0,sad,-0.433188
1,miss,-0.351202
2,hate,-0.257663
3,sorry,-0.244846
4,wish,-0.240993
5,poor,-0.234136
6,sick,-0.226312
7,sucks,-0.211375
8,hurts,-0.196153
9,lost,-0.189007


In [66]:
negative_indices = np.argsort(weights)[-12:]
negative_weights = np.sort(weights)[-12:][::-1]
negative_words = vocab[negative_indices][::-1]
df2 = pd.DataFrame(zip(negative_words, negative_weights), columns=['Word', 'Weight'])
df2

Unnamed: 0,Word,Weight
0,thanks,0.321913
1,',0.304056
2,love,0.2504
3,thank,0.242933
4,welcome,0.191254
5,good,0.190473
6,great,0.1855
7,happy,0.169721
8,awesome,0.15525
9,nice,0.135478


In [23]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
import time


def train_model(train_data, embedding_model, batch_size, num_epochs, learning_rate, dropout_rate=0.00, spell_check=False):

    documentAssembler = DocumentAssembler().setInputCol("CleanText").setOutputCol("document")
    sentence = SentenceDetector().setInputCols(["document"]).setOutputCol("sentence")

    if spell_check:
        tokenizer = RecursiveTokenizer() \
                    .setInputCols(["document"]) \
                    .setOutputCol("token") \
                    .setPrefixes(["\"", "“", "(", "[", "\n", "."]) \
                    .setSuffixes(["\"", "”", ".", ",", "?", ")", "]", "!", ";", ":", "'s", "’s"])

        spellModel = ContextSpellCheckerModel \
                    .pretrained() \
                    .setInputCols("token") \
                    .setOutputCol("checked") \
    else:
        tokenizer = Tokenizer().setInputCols(["sentence"]).setOutputCol("token")

    if embedding_model == 'roberta':
        embeddings = RoBertaSentenceEmbeddings.pretrained()
    elif embedding_model == 'xlm_roberta':
        embeddings = XlmRoBertaSentenceEmbeddings.pretrained()
    elif embedding_model == 'bert':
        embeddings = BertSentenceEmbeddings.pretrained()
    elif embedding_model == 'elmo':
        embeddings = ElmoEmbeddings.pretrained()
    elif embedding_model == 'use':
        embeddings = UniversalSentenceEncoder.pretrained()
    elif embedding_model == 'distilbert':
        embeddings = DistilBertEmbeddings.pretrained()

    embeddings = embeddings.setInputCols(["document"]).setOutputCol("sentence_embeddings")

    classifier = ClassifierDLApproach() \
      .setInputCols("sentence_embeddings") \
      .setOutputCol("prediction") \
      .setLabelColumn("IntLabel") \
      .setBatchSize(batch_size) \
      .setMaxEpochs(num_epochs) \
      .setLr(learning_rate) \
      .setDropout(dropout_rate) \
      .setVerbose(0) \
      .setEnableOutputLogs(True)

    model_pipeline = Pipeline().setStages([
      documentAssembler,
      embeddings,
      classifier
    ])

    model = model_pipeline.fit(train_data)

    return model

In [24]:
import pyspark.ml.classification as mlc
import time
import datetime as dt
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

embedding_models = ['bert', 'roberta', 'use']
batch_size = 256
num_epochs = 50
learning_rate = 5e-4
dropout_rate = 0.50
results = []

for embedding_model in embedding_models:
    print(embedding_model.upper())
    t1 = time.time()
    model = train_model(train_data, embedding_model, batch_size, num_epochs, learning_rate, dropout_rate=0.00)
    ts = dt.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
    path_name = 'gs://eecs6893-hw2-cluster/final-project/training/'
    model_name = embedding_model.upper() + '-' + str(batch_size) + '-' + str(num_epochs) + '-' + ts
    print(model_name)
    model.save(path_name + model_name)
    t2 = time.time()
    te = (t2-t1)
    predictions = model.transform(test_data)
    preds = predictions.select('IntLabel', "prediction.result")
    preds = preds.withColumn('PredLabel', preds['result'].getItem(0).cast(DoubleType()))
    evaluator = MulticlassClassificationEvaluator(labelCol="IntLabel", predictionCol="PredLabel", metricName="accuracy")
    test_acc = evaluator.evaluate(preds)
    result = (embedding_model, te, test_acc)
    results.append(result)
    print("Test Set Accuracy = " + str(test_acc))


results_df = pd.DataFrame(results, columns=['Embedding', 'Train Time', 'Test Accuracy'])
results_df.head(12)

USE
tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[ | ]tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[ | ]Download done! Loading the resource.


[Stage 1:>                                                          (0 + 1) / 1]

[ / ]

                                                                                

[ \ ]

2021-11-19 02:47:55.827480: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-11-19 02:47:56.184702: I external/org_tensorflow/tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2299995000 Hz


[OK!]


21/11/19 02:48:18 WARN org.apache.spark.scheduler.TaskSetManager: Stage 2 contains a task of very large size (7413 KiB). The maximum recommended task size is 1000 KiB.
2021-11-19 02:48:22.028564: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:32] Reading SavedModel from: /tmp/dca1aafe3304_classifier_dl2172160671251170991
2021-11-19 02:48:22.118688: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:55] Reading meta graph with tags { serve }
2021-11-19 02:48:22.118757: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:93] Reading SavedModel debug info (if present) from: /tmp/dca1aafe3304_classifier_dl2172160671251170991
2021-11-19 02:48:22.633352: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:206] Restoring SavedModel bundle.
2021-11-19 02:48:23.646846: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:190] Running initialization op on SavedModel bundle at path: /tmp/dca1aafe3304_classifier_dl2172160671251170991
2021-11-1

Py4JJavaError: An error occurred while calling o227.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 9) (final-project2-w-1.us-east1-d.c.eecs6893-325818.internal executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container from a bad node: container_1637258899703_0004_01_000005 on host: final-project2-w-1.us-east1-d.c.eecs6893-325818.internal. Exit status: 143. Diagnostics: [2021-11-19 02:52:34.076]Container killed on request. Exit code is 143
[2021-11-19 02:52:34.076]Container exited with a non-zero exit code 143. 
[2021-11-19 02:52:34.079]Killed by external signal
.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2259)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2208)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2207)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2446)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2388)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2377)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244)
	at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
	at com.johnsnowlabs.ml.tensorflow.ClassifierDatasetEncoder.calculateEmbeddingsDim(ClassifierDatasetEncoder.scala:186)
	at com.johnsnowlabs.nlp.annotators.classifier.dl.ClassifierDLApproach.train(ClassifierDLApproach.scala:368)
	at com.johnsnowlabs.nlp.annotators.classifier.dl.ClassifierDLApproach.train(ClassifierDLApproach.scala:114)
	at com.johnsnowlabs.nlp.AnnotatorApproach._fit(AnnotatorApproach.scala:68)
	at com.johnsnowlabs.nlp.AnnotatorApproach.fit(AnnotatorApproach.scala:74)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
