In [1]:
import pandas as pd

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Twitter_Sentiment_NLP").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/31 13:02:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/31 13:02:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/05/31 13:02:41 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://ilanp-bucket.s3.us-west-2.amazonaws.com/sentiment_analysis_10k.csv"
spark.sparkContext.addFile(url)
tweet_df = spark.read.csv(SparkFiles.get("sentiment_analysis_10k.csv"), sep=",", header=True, inferSchema=True)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [4]:
tweet_df.show(5)

+--------+----------+--------------------+--------+--------------+--------------------+--------+
|polarity|        id|                date|   query|          user|                text|new_date|
+--------+----------+--------------------+--------+--------------+--------------------+--------+
|       0|2051199119|Fri Jun 05 21:04:...|NO_QUERY|    alicatpurr|My moonstone pend...|  6/5/09|
|       0|2051199378|Fri Jun 05 21:04:...|NO_QUERY|      joshwehe|Watching baseball...|  6/5/09|
|       0|2051200441|Fri Jun 05 21:05:...|NO_QUERY|    qwerkyqook|RIP cute black ma...|  6/5/09|
|       0|2051201409|Fri Jun 05 21:05:...|NO_QUERY|       Lizfig3|@pandafandanga we...|  6/5/09|
|       0|2051201881|Fri Jun 05 21:05:...|NO_QUERY|sweet_ctstrphe|lost my  voice  w...|  6/5/09|
+--------+----------+--------------------+--------+--------------+--------------------+--------+
only showing top 5 rows



In [5]:
list = [
    {"polarity": 1.0, "label" : 1.0, "text" : "I am so happy to be here today!"},
    {"polarity" : 0.0,"label" : 0.0, "text" : "Today is a terrible day."},
    {"polarity" : 1.0,"label" : 1.0, "text" : "I am so in love today!"}
]

In [6]:
tweet_df2 = spark.createDataFrame(list)
#tweet_df2 = tweet_df
tweet_df2.show()

                                                                                

+-----+--------+--------------------+
|label|polarity|                text|
+-----+--------+--------------------+
|  1.0|     1.0|I am so happy to ...|
|  0.0|     0.0|Today is a terrib...|
|  1.0|     1.0|I am so in love t...|
+-----+--------+--------------------+



In [10]:
# Import functions
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [11]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature
data_df = tweet_df2.withColumn('length', length(tweet_df2['text']))
data_df.show()

+-----+--------+--------------------+------+
|label|polarity|                text|length|
+-----+--------+--------------------+------+
|  1.0|     1.0|I am so happy for...|    63|
|  0.0|     0.0|This sucks!  I do...|    39|
+-----+--------+--------------------+------+



In [12]:
# Create all the features to the data set
#pos_neg_to_num = StringIndexer(inputCol='polarity',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [14]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, clean_up])

In [15]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

                                                                                

In [16]:
cleaned.show()

22/05/31 12:59:57 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/31 12:59:57 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/31 12:59:57 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


+-----+--------+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|polarity|                text|length|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+-----+--------+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  1.0|     1.0|I am so happy for...|    63|[i, am, so, happy...|[happy, text!, , ...|(262144,[645,1239...|(262144,[645,1239...|(262145,[645,1239...|
|  0.0|     0.0|This sucks!  I do...|    39|[this, sucks!, , ...|[sucks!, , like, ...|(262144,[48874,18...|(262144,[48874,18...|(262145,[48874,18...|
+-----+--------+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+



In [17]:
cleaned.select(['polarity','label','features']).show(truncate=False)

+--------+-----+-----------------------------------------------------------------------------------------------------------------------------+
|polarity|label|features                                                                                                                     |
+--------+-----+-----------------------------------------------------------------------------------------------------------------------------+
|1.0     |1.0  |(262145,[645,123940,162817,183411,262144],[0.4054651081081644,0.4054651081081644,0.4054651081081644,0.4054651081081644,63.0])|
|0.0     |0.0  |(262145,[48874,180672,208258,262144],[0.4054651081081644,0.4054651081081644,0.4054651081081644,39.0])                        |
+--------+-----+-----------------------------------------------------------------------------------------------------------------------------+



22/05/31 13:00:07 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/31 13:00:07 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/31 13:00:07 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


In [18]:
# Break data down into a training set and a testing set
#training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [19]:
#from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
# Create a Naive Bayes model and fit training data
#nb = NaiveBayes()
#predictor = nb.fit(training)

In [20]:
#nb_path = "./nb"
#nb.save(nb_path)

In [21]:
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
# Restore the saved NaiveBayes classifier
nb2 = NaiveBayes.load("./nb")
nb2.getSmoothing()

1.0

In [22]:
#predictor.save("./nb_model")

In [23]:
#Restored the trained predictor
predictor2 = NaiveBayesModel.load("./nb_model")

[Stage 23:>                                                         (0 + 1) / 1]                                                                                

In [25]:
test_results = predictor2.transform(cleaned)
test_results.show(20) 

+-----+--------+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|polarity|                text|length|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+-----+--------+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  1.0|     1.0|I am so happy for...|    63|[i, am, so, happy...|[happy, text!, , ...|(262144,[645,1239...|(262144,[645,1239...|(262145,[645,1239...|[-53.527404207656...|[0.57733184971333...|       0.0|
|  0.0|     0.0|This sucks!  I do...|    39|[this, sucks!, , ...|[sucks!, , like, ...|(262144,[48874,18...|(262144,[48874,18...|(262145,[48874,18...|[-33.700769564848...|[0.87790717642727.

22/05/31 13:00:38 WARN DAGScheduler: Broadcasting large task binary with size 8.1 MiB
22/05/31 13:00:38 WARN DAGScheduler: Broadcasting large task binary with size 8.1 MiB
22/05/31 13:00:38 WARN DAGScheduler: Broadcasting large task binary with size 8.1 MiB


In [26]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

acc_eval = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction')
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting Text Sentiment was: %f" % acc)


22/05/31 13:00:47 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/05/31 13:01:00 ERROR Executor: Exception in task 5.0 in stage 30.0 (TID 144)]
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3942)
	at java.base/java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3747)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:1988)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1580)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.Obje

Py4JJavaError: An error occurred while calling o293.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 30.0 failed 1 times, most recent failure: Lost task 5.0 in stage 30.0 (TID 144) (ip-192-168-50-63.us-west-2.compute.internal executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.base/java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3942)
	at java.base/java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3747)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:1988)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1580)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:304)
	at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:171)
	at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:151)
	at org.apache.spark.rdd.OrderedRDDFunctions.$anonfun$sortByKey$1(OrderedRDDFunctions.scala:64)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.OrderedRDDFunctions.sortByKey(OrderedRDDFunctions.scala:63)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3$lzycompute(BinaryClassificationMetrics.scala:189)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3(BinaryClassificationMetrics.scala:178)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:180)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:180)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:272)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:103)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:123)
	at org.apache.spark.ml.evaluation.BinaryClassificationEvaluator.evaluate(BinaryClassificationEvaluator.scala:102)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3942)
	at java.base/java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3747)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:1988)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1580)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2082)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2244)


In [None]:
df = test_results.select("text","label","prediction", "probability").toPandas()


Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Thread-5"


In [None]:
df

In [None]:
from sklearn.metrics import confusion_matrix
# Generate the confusion matrix
cm = confusion_matrix(df["label"], df["prediction"])
cm_df = pd.DataFrame(
    cm, index=["Actual 0", "Actual 1"],
    columns=["Predicted 0", "Predicted 1"]
)

# Displaying results
display(cm_df)


Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "refresh progress"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RemoteBlock-temp-file-clean-thread"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Thread-32"
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/ilan/opt/anaconda3/envs/mlenv2/lib/python3.7/site-packages/py4j/clientserver.py", line 480, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/ilan/opt/anaconda3/envs/mlenv2/lib/python3.7/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/ilan/opt/anaconda3/envs/mlenv2/lib/python3.7

In [None]:
# Store environmental variable
from getpass import getpass
password = getpass('Provide Password')

# Configure settings for RDS
mode = "overwrite"
jdbc_url="jdbc:postgresql://database-1.c3f2jo4rdylg.us-west-2.rds.amazonaws.com:5432/sentiment_analysis"
config = {"user":"postgres", 
          "password": password, 
          "driver":"org.postgresql.Driver"}

22/05/30 19:55:25 ERROR Utils: Uncaught exception in thread Executor task launch worker for task 0.0 in stage 10.0 (TID 31)
java.lang.NullPointerException
	at org.apache.spark.scheduler.Task.$anonfun$run$2(Task.scala:152)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1442)
	at org.apache.spark.scheduler.Task.run(Task.scala:150)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
	at java.base/java.lang.Thread.run(Thread.java:844)
22/05/30 19:55:25 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.IllegalStateException: Block broadcast_18 not found
	at org.apache.spark.storage.BlockInfoManager.$

In [18]:
test_results["polarity",'text','new_date',"length","label", "token_text","features", "prediction"].show(truncate=False)

+--------+-----------------------------------------------------------------------------------------------------------------------------------------+--------+------+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|polarity|text                                      

In [19]:
for col in test_results.dtypes:
    print(col[0]+" , "+col[1])

polarity , int
id , int
date , string
query , string
user , string
text , string
new_date , string
length , int
label , double
token_text , array<string>
stop_tokens , array<string>
hash_token , vector
idf_token , vector
features , vector
rawPrediction , vector
probability , vector
prediction , double


In [20]:
from pyspark.sql.functions import col, concat_ws
test_results2 = test_results.withColumn("token_text", concat_ws(",",col("token_text")))
test_results3 = test_results2.withColumn("stop_tokens", concat_ws(",",col("stop_tokens")))
test_results3.show(truncate=False)

+--------+----------+----------------------------+--------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------+--------+------+-----+-----------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [21]:
# Write DataFrame to active_user table in RDS
test_results['polarity','text','new_date',"length","label", "token_text", "prediction"].write.jdbc(url=jdbc_url, table='test_results', mode=mode, properties=config)

In [10]:
import requests
import os
import json
import pandas as pd
from flask import session


# To set your enviornment variables in your terminal run the following line:
# export 'BEARER_TOKEN'='<your_bearer_token>'
with open('../myconfig.json','r') as fh:
    config = json.load(fh)
os.environ["BEARER_TOKEN"] = config["BEARER_TOKEN"]

bearer_token = os.environ.get("BEARER_TOKEN")


def bearer_oauth(r):
    """
    Method required by bearer token authentication.
    """

    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2FilteredStreamPython"
    return r

def create_rules(hashtag_data):
        new_rules = '{"rules" : ['
        counter = 0
        for hashtag in hashtag_data['tw_trends']:
            counter = counter + 1
            if counter == 1:
                new_rules = new_rules + '{"value": "' + hashtag['hashtag'] + ' -is:retweet lang:en -has:links -has:media", "tag": "' + hashtag['hashtag'] + '"}'
            else:
                new_rules = new_rules + ',{"value": "' + hashtag['hashtag'] + ' -is:retweet lang:en -has:links -has:media", "tag": "' + hashtag['hashtag'] + '"}'
        new_rules = new_rules + ']}'        
        print(new_rules)
        return(new_rules)

def get_rules():
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot get rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print('get_rules() response:')    
    print(json.dumps(response.json()))
    return response.json()


def delete_all_rules(rules):
    if rules is None or "data" not in rules:
        return None

    ids = list(map(lambda rule: rule["id"], rules["data"]))
    payload = {"delete": {"ids": ids}}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot delete rules (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    print('delete_all_rules(rules) response:')    
    
    (json.dumps(response.json()))


def set_rules(rules):
    # You can adjust the rules if needed 
    # if the passed in rules is null, then 
    if rules is None:
        sample_rules = [
            {"value": "dog has:images", "tag": "dog pictures"},
            {"value": "cat has:images -grumpy", "tag": "cat pictures"},
        ]
    else:
        print('assigning specified rules')
        sample_rules = rules
    payload = {"add": sample_rules}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload,
    )
    if response.status_code != 201:
        raise Exception(
            "Cannot add rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print('set_rules(delete) response:')      
    print(json.dumps(response.json()))


def get_stream(countOfTweets):
    countOfTweets = int(countOfTweets)
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream", auth=bearer_oauth, stream=True,
    )
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(
            "Cannot get stream (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    all_responses = []
    count = 0
    for response_line in response.iter_lines():
        if count >= countOfTweets:
            break;

        if response_line:
            # get the current tweet json
            json_response = json.loads(response_line)    
 
            # Keep count of the processed tweets
            count = count + 1 

            # initialize a dict to hold the current tweet
            tweet = {}

            # extract the data from the tweet and store it in our variable
            tweet['count'] = count
            tweet['id'] = json_response["data"]["id"]
            tweet["text"] = json_response["data"]["text"]
            tweet["tag"] = json_response["matching_rules"][0]["tag"]

            # Update Session with current tweet.
            #session['current_tweet'] = tweet 

            # add the tweet to our response list
            all_responses.append(tweet)

            #print(json.dumps(json_response, indent=4, sort_keys=True))
            print('tweets streamed: ' + str(count))
    #take the streamed responses, put it into a dataframe and print the dataframe        
    #df = pd.DataFrame(all_responses)
    #print(df)

    #return the streamed responses         
    return(all_responses)


            

def main():
    rules = get_rules()
    delete = delete_all_rules(rules)
    set = set_rules(delete)
    get_stream(set) 
    


#if __name__ == "__main__":
    main()



In [50]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.sql.functions import length
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel

# Create the spark session
spark = SparkSession.builder.appName("Twitter_Sentiment_NLP").getOrCreate()

#Pre-Load the classifier and the model
# Load the saved NaiveBayes Classifier
nb = NaiveBayes.load("../static/resources/nb")

#Restored the trained predictor (Trained on 1 mil tweets)
predictor = NaiveBayesModel.load("../static/resources/nb_model")

def eval_text_single(text, polarity = 1.0):
    list = [
    {"polarity": polarity, "text" : text}
    ]

    # The pipeline doesn't work as well when it it just one record in the list, so creating a fake list and adding the request to it.
    text_list = [{"text": "I am so happy for this text!  I can now have everything I want.", "polarity": 1.0},
             {"text": "This sucks!  I don't like this anymore.", "polarity": 0.0},
             {"text" : "This is a bad text.", "polarity" : 0.0},
             {"text": "I love you.", "polarity": 0.0},
             {"text": "Wow!  I can't believe how great this is.", "polarity": 0.0},
            ]
    
    # Add to the fake list
    text_list.append({"text" : text, "polarity" : polarity})
    
    tweet_df = spark.createDataFrame(text_list)

    # Create a length column to be used as a future feature
    data_df = tweet_df.withColumn('length', length(tweet_df['text']))

    # Create all the features to the data set
    tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
    stopRemove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
    hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
    idf = IDF(inputCol='hash_token', outputCol='idf_token')

    # Create feature vectors
    clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

    # Create and run a data processing Pipeline
    data_prep_pipeline = Pipeline(stages=[tokenizer, stopRemove, hashingTF, idf, clean_up])

    # Fit and transform the pipeline
    cleaner = data_prep_pipeline.fit(data_df)
    cleaned = cleaner.transform(data_df)

    # Load the saved NaiveBayes Classifier
    #nb = NaiveBayes.load("static/resources/nb")

    #Restored the trained predictor (Trained on 1 mil tweets)
    #predictor = NaiveBayesModel.load("static/resources/nb_model")

    #Predict the sentiment of the text using the restored predictor
    test_results = predictor.transform(cleaned)

    df = test_results.select("text","prediction", "probability").toPandas()

    positives = [prob[1] for prob in df['probability']]
    df['probability'] = positives
    
    #Prepare the results, show the first row 
    result = {}
    result["text"] = df["text"][5]
    result["prediction"] = df["prediction"][5]
    result["probability"] = df["probability"][5]

    if result["prediction"] == 1:
        result["prediction"] = "Positive"
    else: 
        result["prediction"] = "Negative"

    return(result)

def eval_text_list(text_list):

    tweet_df = spark.createDataFrame(text_list)

    # Create a length column to be used as a future feature
    data_df = tweet_df.withColumn('length', length(tweet_df['text']))

    # Create all the features to the data set
    tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
    stopRemove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
    hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
    idf = IDF(inputCol='hash_token', outputCol='idf_token')

    # Create feature vectors
    clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

    # Create and run a data processing Pipeline
    data_prep_pipeline = Pipeline(stages=[tokenizer, stopRemove, hashingTF, idf, clean_up])

    # Fit and transform the pipeline
    cleaner = data_prep_pipeline.fit(data_df)
    cleaned = cleaner.transform(data_df)

    # Load the saved NaiveBayes Classifier
    #nb = NaiveBayes.load("static/resources/nb")

    #Restored the trained predictor (Trained on 1 mil tweets)
    #predictor = NaiveBayesModel.load("static/resources/nb_model")

    #Predict the sentiment of the text using the restored predictor
    test_results = predictor.transform(cleaned)

    df = test_results.select("text", "tag", "prediction", "probability").toPandas()
    
    positive_score = [prob[1] for prob in df['probability']]
    df['probability'] = positive_score

    percents = ["{:.2%}".format(prob) for prob in df['probability']]
    df['percent'] = percents

    df.loc[df['prediction'] == 1.0, 'prediction'] = 'Positive'
    df.loc[df['prediction'] == 0.0, 'prediction'] = 'Negative'

    top_10 = df.sort_values(by=['probability'], ascending=False).head(10)
    bottom_10 = df.sort_values(by=['probability'], ascending=True).head(10)
    
    return(df, top_10, bottom_10)
    

In [12]:
from xml.etree.ElementTree import tostring
from flask import Flask, render_template, redirect, url_for, request, session
import json



#pull the rules from the textarea input box
rules = '{"rules" : [{"value": "Apple -is:retweet lang:en -has:links -has:media", "tag": "Apple"},{"value": "#WWDC22 -is:retweet lang:en -has:links -has:media", "tag": "#WWDC22"},{"value": "#LoveIsland -is:retweet lang:en -has:links -has:media", "tag": "#LoveIsland"},{"value": "Proud Boys -is:retweet lang:en -has:links -has:media", "tag": "Proud Boys"},{"value": "iOS 16 -is:retweet lang:en -has:links -has:media", "tag": "iOS 16"},{"value": "Aaron Donald -is:retweet lang:en -has:links -has:media", "tag": "Aaron Donald"},{"value": "Wilbur -is:retweet lang:en -has:links -has:media", "tag": "Wilbur"},{"value": "Giant Bomb -is:retweet lang:en -has:links -has:media", "tag": "Giant Bomb"},{"value": "Jocelyn Alo -is:retweet lang:en -has:links -has:media", "tag": "Jocelyn Alo"},{"value": "Michigan -is:retweet lang:en -has:links -has:media", "tag": "Michigan"}]}'

#pull the number of Tweets to request from the Twitter API
countOfTweets = 25
print(f'Count of Tweets: {str(countOfTweets)}')
if countOfTweets is None:
    countOfTweets = 10

print('rules: type: ' + rules)

#Perform the steps needed to receive the twitter stream

rules = json.loads(rules)
#get the previous rules
old_rules = get_rules()

#delete the previous rules
delete = delete_all_rules(old_rules)

#set the rules to be the new rules
set = set_rules(rules["rules"])



Count of Tweets: 25
rules: type: {"rules" : [{"value": "Apple -is:retweet lang:en -has:links -has:media", "tag": "Apple"},{"value": "#WWDC22 -is:retweet lang:en -has:links -has:media", "tag": "#WWDC22"},{"value": "#LoveIsland -is:retweet lang:en -has:links -has:media", "tag": "#LoveIsland"},{"value": "Proud Boys -is:retweet lang:en -has:links -has:media", "tag": "Proud Boys"},{"value": "iOS 16 -is:retweet lang:en -has:links -has:media", "tag": "iOS 16"},{"value": "Aaron Donald -is:retweet lang:en -has:links -has:media", "tag": "Aaron Donald"},{"value": "Wilbur -is:retweet lang:en -has:links -has:media", "tag": "Wilbur"},{"value": "Giant Bomb -is:retweet lang:en -has:links -has:media", "tag": "Giant Bomb"},{"value": "Jocelyn Alo -is:retweet lang:en -has:links -has:media", "tag": "Jocelyn Alo"},{"value": "Michigan -is:retweet lang:en -has:links -has:media", "tag": "Michigan"}]}
get_rules() response:
{"data": [{"id": "1533945829374889985", "value": "Giant Bomb -is:retweet lang:en -has:lin

In [13]:
#Start the twitter stream with the requested rule set
tweet_list = get_stream(countOfTweets) 



200
tweets streamed: 1
tweets streamed: 2
tweets streamed: 3
tweets streamed: 4
tweets streamed: 5
tweets streamed: 6
tweets streamed: 7
tweets streamed: 8
tweets streamed: 9
tweets streamed: 10
tweets streamed: 11
tweets streamed: 12
tweets streamed: 13
tweets streamed: 14
tweets streamed: 15
tweets streamed: 16
tweets streamed: 17
tweets streamed: 18
tweets streamed: 19
tweets streamed: 20
tweets streamed: 21
tweets streamed: 22
tweets streamed: 23
tweets streamed: 24
tweets streamed: 25


In [46]:
#Send the collected twitter feed to the machine learning model
eval_list, top_10, bottom_10 = eval_text_list(tweet_list)  



22/06/06 16:15:07 WARN DAGScheduler: Broadcasting large task binary with size 8.1 MiB


In [47]:
#print the returned eval_list
top_10

Unnamed: 0,text,tag,prediction,probability,percent
1,@PalmerReport I look forward to Stone and Jone...,Proud Boys,Positive,1.0,100.00%
14,@VoidBurger I loved following Jeff on GB but y...,Giant Bomb,Positive,0.999999,100.00%
11,Since the head of the Proud boys was seen in t...,Proud Boys,Positive,0.999673,99.97%
15,"@duty2warn Biden: 81,282,916\nTrump:74,223,369",Michigan,Positive,0.988073,98.81%
4,👑Apple shares are trading higher in anticipati...,Apple,Positive,0.981373,98.14%
0,@thurrott Maybe they should make two keynotes....,Apple,Positive,0.853507,85.35%
16,iOS 16 is gonna be 🔥,iOS 16,Positive,0.599124,59.91%
24,@Ahmed_UTC2 @BrachatJan @jenni_moyer @NoContex...,Apple,Negative,0.392138,39.21%
21,@twostraws Where is info of SWIFTUI 4.0?,#WWDC22,Negative,0.250873,25.09%
9,The NCAA is a joke with that call against Mich...,Michigan,Negative,0.068226,6.82%


In [49]:
bottom_10

Unnamed: 0,text,tag,prediction,probability,percent
1,@PalmerReport I look forward to Stone and Jone...,Proud Boys,Positive,1.0,100.00%
14,@VoidBurger I loved following Jeff on GB but y...,Giant Bomb,Positive,0.999999,100.00%
11,Since the head of the Proud boys was seen in t...,Proud Boys,Positive,0.999673,99.97%
15,"@duty2warn Biden: 81,282,916\nTrump:74,223,369",Michigan,Positive,0.988073,98.81%
4,👑Apple shares are trading higher in anticipati...,Apple,Positive,0.981373,98.14%
0,@thurrott Maybe they should make two keynotes....,Apple,Positive,0.853507,85.35%
16,iOS 16 is gonna be 🔥,iOS 16,Positive,0.599124,59.91%
24,@Ahmed_UTC2 @BrachatJan @jenni_moyer @NoContex...,Apple,Negative,0.392138,39.21%
21,@twostraws Where is info of SWIFTUI 4.0?,#WWDC22,Negative,0.250873,25.09%
9,The NCAA is a joke with that call against Mich...,Michigan,Negative,0.068226,6.82%
