In [119]:
import os

spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to cloud.r-project.org] [Conn0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com] [Connecting to                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machin

In [120]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [125]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://raw.githubusercontent.com/choushuiguo325/disaster_tweets_ML/master/Resources/train_test.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("train_test.csv"), sep=",", header=True)
# Show DataFrame
df.show()

+---+-------+--------+--------------------+------+--------+
| id|keyword|location|                text|target|   class|
+---+-------+--------+--------------------+------+--------+
|  1|   null|    null|Our Deeds are the...|     1|positive|
|  4|   null|    null|Forest fire near ...|     1|positive|
|  5|   null|    null|All residents ask...|     1|positive|
|  6|   null|    null|13,000 people rec...|     1|positive|
|  7|   null|    null|Just got sent thi...|     1|positive|
|  8|   null|    null|#RockyFire Update...|     1|positive|
| 10|   null|    null|#flood #disaster ...|     1|positive|
| 13|   null|    null|I'm on top of the...|     1|positive|
| 14|   null|    null|There's an emerge...|     1|positive|
| 15|   null|    null|I'm afraid that t...|     1|positive|
| 16|   null|    null|Three people died...|     1|positive|
| 17|   null|    null|Haha South Tampa ...|     1|positive|
| 18|   null|    null|#raining #floodin...|     1|positive|
| 19|   null|    null|#Flood in Bago My.

In [139]:
import re
from pyspark.sql.functions import length, regexp_replace, udf
from pyspark.sql.types import IntegerType

# Remove the punctuations in text
puncRep = udf(lambda x: re.sub(r'[^\w\s]', '',x) )
df = df.withColumn('cleaned',puncRep('text'))

# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['cleaned']))
data_df = data_df.withColumn("target", data_df["target"].cast(IntegerType()))
data_df = data_df.withColumnRenamed("target", "label")
data_df.show()

+---+-------+--------+--------------------+-----+--------+--------------------+------+
| id|keyword|location|                text|label|   class|             cleaned|length|
+---+-------+--------+--------------------+-----+--------+--------------------+------+
|  1|   null|    null|Our Deeds are the...|    1|positive|Our Deeds are the...|    68|
|  4|   null|    null|Forest fire near ...|    1|positive|Forest fire near ...|    37|
|  5|   null|    null|All residents ask...|    1|positive|All residents ask...|   130|
|  6|   null|    null|13,000 people rec...|    1|positive|13000 people rece...|    63|
|  7|   null|    null|Just got sent thi...|    1|positive|Just got sent thi...|    86|
|  8|   null|    null|#RockyFire Update...|    1|positive|RockyFire Update ...|   103|
| 10|   null|    null|#flood #disaster ...|    1|positive|flood disaster He...|    92|
| 13|   null|    null|I'm on top of the...|    1|positive|Im on top of the ...|    55|
| 14|   null|    null|There's an emerge...|

In [140]:
# data_df_modified = data_df[['class','cleaned','length']]
data_df_modified = data_df[['label','cleaned','length']]
data_df_modified.show()

+-----+--------------------+------+
|label|             cleaned|length|
+-----+--------------------+------+
|    1|Our Deeds are the...|    68|
|    1|Forest fire near ...|    37|
|    1|All residents ask...|   130|
|    1|13000 people rece...|    63|
|    1|Just got sent thi...|    86|
|    1|RockyFire Update ...|   103|
|    1|flood disaster He...|    92|
|    1|Im on top of the ...|    55|
|    1|Theres an emergen...|    78|
|    1|Im afraid that th...|    48|
|    1|Three people died...|    43|
|    1|Haha South Tampa ...|   127|
|    1|raining flooding ...|    69|
|    1|Flood in Bago Mya...|    37|
|    1|Damage to school ...|    55|
|    0|        Whats up man|    12|
|    0|       I love fruits|    13|
|    0|    Summer is lovely|    16|
|    0|   My car is so fast|    17|
|    0|What a goooooooaa...|    22|
+-----+--------------------+------+
only showing top 20 rows



In [None]:
# df = spark.createDataFrame([
#     (1,"positive", "####Our Deeds are the Reason of this #earthquake May ALLAH Forgive us all"),
#     (2,"negative", "!Forest fire near La Ronge Sask. Canada"),
#     (3,"negative", "All residents asked to 'shelter in place' are being notified by officers. No other evacuation or shelter in place orders are expected"),
#     (4,"positive", "??13,000 people receive #wildfires evacuation orders in California ."),
#     (5,"positive", "!Mango juice with crushed ice&gt;&gt;&gt;&gt;??"),
#     (6,"negative", "#RockyFire Update => California Hwy. 20 closed in both directions due to Lake County fire - #CAfire #wildfires")
# ],["id","class", "text"])

# from pyspark.sql.functions import length
# data_df = df.withColumn('text',)
# # Create a length column to be used as a future feature 
# data_df = df.withColumn('length', length(df['text']))
# data_df.show()

# data_df_modified = data_df[['class','text','length']]
# data_df_modified.show()

In [None]:
# data_df_modified = data_df[['class','text','length']]
# data_df_modified.show()

### Feature Transformations


In [131]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
# pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="cleaned", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [132]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [141]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
# data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])
data_prep_pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, clean_up])

In [142]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df_modified)
cleaned = cleaner.transform(data_df_modified)
cleaned.show()

+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|             cleaned|length|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    1|Our Deeds are the...|    68|[our, deeds, are,...|[deeds, reason, e...|(262144,[24370,68...|(262144,[24370,68...|(262145,[24370,68...|
|    1|Forest fire near ...|    37|[forest, fire, ne...|[forest, fire, ne...|(262144,[55310,91...|(262144,[55310,91...|(262145,[55310,91...|
|    1|All residents ask...|   130|[all, residents, ...|[residents, asked...|(262144,[38983,41...|(262144,[38983,41...|(262145,[38983,41...|
|    1|13000 people rece...|    63|[13000, people, r...|[13000, people, r...|(262144,[38983,39...|(262144,[38983,39...|(262145,[38983,39...|
|    1|Just g

In [143]:
data_df_modified.show()

+-----+--------------------+------+
|label|             cleaned|length|
+-----+--------------------+------+
|    1|Our Deeds are the...|    68|
|    1|Forest fire near ...|    37|
|    1|All residents ask...|   130|
|    1|13000 people rece...|    63|
|    1|Just got sent thi...|    86|
|    1|RockyFire Update ...|   103|
|    1|flood disaster He...|    92|
|    1|Im on top of the ...|    55|
|    1|Theres an emergen...|    78|
|    1|Im afraid that th...|    48|
|    1|Three people died...|    43|
|    1|Haha South Tampa ...|   127|
|    1|raining flooding ...|    69|
|    1|Flood in Bago Mya...|    37|
|    1|Damage to school ...|    55|
|    0|        Whats up man|    12|
|    0|       I love fruits|    13|
|    0|    Summer is lovely|    16|
|    0|   My car is so fast|    17|
|    0|What a goooooooaa...|    22|
+-----+--------------------+------+
only showing top 20 rows



In [None]:
# cleaned.rdd.map(lambda x: (x.stop_tokens))

In [144]:
# Show label and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    1|(262145,[24370,68...|
|    1|(262145,[55310,91...|
|    1|(262145,[38983,41...|
|    1|(262145,[38983,39...|
|    1|(262145,[39441,52...|
|    1|(262145,[7588,394...|
|    1|(262145,[1797,163...|
|    1|(262145,[4106,853...|
|    1|(262145,[38983,70...|
|    1|(262145,[12409,31...|
|    1|(262145,[59791,96...|
|    1|(262145,[33053,59...|
|    1|(262145,[19153,24...|
|    1|(262145,[16319,77...|
|    1|(262145,[12826,27...|
|    0|(262145,[48531,10...|
|    0|(262145,[186480,2...|
|    0|(262145,[167401,1...|
|    0|(262145,[71578,11...|
|    0|(262145,[195910,2...|
+-----+--------------------+
only showing top 20 rows



In [154]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [171]:
training.show()
training.filter(training['label']==1).count()

+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|             cleaned|length|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    0|   Do you like pasta|    17|[do, you, like, p...|       [like, pasta]|(262144,[208258,2...|(262144,[208258,2...|(262145,[208258,2...|
|    0|            LOOOOOOL|     8|          [looooool]|          [looooool]|(262144,[12268],[...|(262144,[12268],[...|(262145,[12268,26...|
|    0|     London is cool |    15|  [london, is, cool]|      [london, cool]|(262144,[223619,2...|(262144,[223619,2...|(262145,[223619,2...|
|    0|  Love my girlfriend|    18|[love, my, girlfr...|  [love, girlfriend]|(262144,[121169,1...|(262144,[121169,1...|(262145,[121169,1...|
|    0|      

11

In [155]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|             cleaned|length|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0|             Cooool |     7|            [cooool]|            [cooool]|(262144,[43292],[...|(262144,[43292],[...|(262145,[43292,26...|[-86.182207144436...|[1.08979306084503...|       1.0|
|    0|Crying out for mo...|    33|[crying, out, for...|[crying, set, abl...|(262144,[151657,2...|(262144,[151657,2...|(262145,[151657,2...|[-328.32404539489...|[1.68322802418822...|       1.0|
|    0|       I love fruits|  

In [169]:
print("test size",test_results.count())
test_results.show()

test size 12
+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|             cleaned|length|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0|             Cooool |     7|            [cooool]|            [cooool]|(262144,[43292],[...|(262144,[43292],[...|(262145,[43292,26...|[-86.182207144436...|[1.08979306084503...|       1.0|
|    0|Crying out for mo...|    33|[crying, out, for...|[crying, set, abl...|(262144,[151657,2...|(262144,[151657,2...|(262145,[151657,2...|[-328.32404539489...|[1.68322802418822...|       1.0|
|    0|       I l

In [181]:
right_prediction = test_results.filter(test_results['prediction'] == test_results['label']).count()
tp = test_results.filter((test_results['prediction']== 1) & (test_results['label']== 1)).count()
tn = test_results.filter((test_results['prediction']== 0) & (test_results['label']== 0)).count()
fp = test_results.filter((test_results['prediction']== 1) & (test_results['label']== 0)).count()
fn = test_results.filter((test_results['prediction']== 0) & (test_results['label']== 1)).count()

confusionMatrix = spark.createDataFrame([
    (1,"predicton: P", tp,fp),
    (2,"predicton: N", fn,tn)
], ["id","xixix","actual: P", "actual: N"])

confusionMatrix.show()

+---+------------+---------+---------+
| id|       xixix|actual: P|actual: N|
+---+------------+---------+---------+
|  1|predicton: P|        6|        6|
|  2|predicton: N|        0|        0|
+---+------------+---------+---------+



In [157]:

# # Use the Class Evaluator for a cleaner description
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# acc_eval = MulticlassClassificationEvaluator()
# acc = acc_eval.evaluate(test_results)
# print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.333333


In [159]:
accuracy_eval = MulticlassClassificationEvaluator(metricName = 'accuracy')
print("accuracy",accuracy_eval.evaluate(test_results))

accuracy 0.5


In [168]:
# Compute raw scores on the test set
from pyspark.mllib.evaluation import BinaryClassificationMetrics
predictionAndLabels = test_results.rdd.map(lambda lp: (lp.prediction, lp.label))

# Instantiate metrics object
metrics = BinaryClassificationMetrics(predictionAndLabels)
# # Area under precision-recall curve
# print("Area under PR = %s" % metrics.areaUnderPR)

# # Area under ROC curve
# print("Area under ROC = %s" % metrics.areaUnderROC)

precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

AttributeError: ignored