In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
#os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.39)] [Co                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
                                                                               Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.39)] [Co0% [1 InRelease gpgv 242 kB] [Waiting for headers] [Connecting to security.ubun                                                                               Hit:4 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
0% [1 InRelease gpgv 242 kB] [Connecting to security.ubuntu.com (91.189.91.39)]                                                                               Hit:5 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Twitter_Ukraine").getOrCreate()
# import textBlob
from textblob import TextBlob

In [3]:
# Read in data from S3 Buckets **** IF RUNNING FROM COLAB, WE MIGHT HAVE TO USE AN AWS BUCKET OR SIMILAR ****
from pyspark import SparkFiles
url ="https://bootcamp-team-project.s3.us-west-2.amazonaws.com/mock_data.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("mock_data.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+-------------+---------+---------+-----------+-----------+------------+--------------+--------------------+
|     location|following|followers|totaltweets|    tweetid|retweetcount|favorite_count|                text|
+-------------+---------+---------+-----------+-----------+------------+--------------+--------------------+
|United States|     1336|     1308|      48763|1.50135E+18|          36|             0|breaking  over   ...|
|United States|    10327|    10345|     112078|1.50135E+18|          53|             0|breaking  a cruis...|
|United States|    13941|    13332|     112046|1.50135E+18|           0|             0|my heart goes out...|
|          USA|     3590|     4124|     300831|1.50135E+18|           1|             0| russia  ukraine ...|
|United States|     7898|     7221|      88903|1.50135E+18|          64|             0|one very concerni...|
|          USA|     3361|     1265|      26248|1.50135E+18|          59|             0|ukrainian army  n...|
|United States|    

In [4]:
# Import functions
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [5]:
# create a copy and write sentiment polarity using TextBlob
from pyspark.sql.functions import udf
data_df = df['following','followers','totaltweets','retweetcount','favorite_count','text']
sentiment = udf(lambda x: TextBlob(x).sentiment[0])
spark.udf.register("sentiment", sentiment)
data_df = data_df.withColumn('sentiment',sentiment('text').cast('double'))
data_df.show()

+---------+---------+-----------+------------+--------------+--------------------+--------------------+
|following|followers|totaltweets|retweetcount|favorite_count|                text|           sentiment|
+---------+---------+-----------+------------+--------------+--------------------+--------------------+
|     1336|     1308|      48763|          36|             0|breaking  over   ...|                -0.1|
|    10327|    10345|     112078|          53|             0|breaking  a cruis...|                 0.0|
|    13941|    13332|     112046|           0|             0|my heart goes out...|                 0.5|
|     3590|     4124|     300831|           1|             0| russia  ukraine ...| -0.0851851851851852|
|     7898|     7221|      88903|          64|             0|one very concerni...|-0.03333333333333334|
|     3361|     1265|      26248|          59|             0|ukrainian army  n...|              0.1375|
|      947|      386|      25040|           1|             0| pu

In [14]:

# Binning the sentiment polarities
from pyspark.sql.types import StringType
def sentiment_bins(value):
   # try:
      if value > 0.6:
        return "ExtremePositive"
      if value > 0.3:
        return "Positive"
      if value < -0.6:
        return "ExtremeNegative"
      if value < -0.3:
        return "Negative"
      else:
        return "Neutral"
 #   except:
 #       return None

partial_func = udf(lambda x: sentiment_bins(x))
data_df = data_df.withColumn("sentiment_bins", partial_func(data_df.sentiment))

data_df.show()


+---------+---------+-----------+------------+--------------+--------------------+--------------------+--------------+
|following|followers|totaltweets|retweetcount|favorite_count|                text|           sentiment|sentiment_bins|
+---------+---------+-----------+------------+--------------+--------------------+--------------------+--------------+
|     1336|     1308|      48763|          36|             0|breaking  over   ...|                -0.1|       Neutral|
|    10327|    10345|     112078|          53|             0|breaking  a cruis...|                 0.0|       Neutral|
|    13941|    13332|     112046|           0|             0|my heart goes out...|                 0.5|      Positive|
|     3590|     4124|     300831|           1|             0| russia  ukraine ...| -0.0851851851851852|       Neutral|
|     7898|     7221|      88903|          64|             0|one very concerni...|-0.03333333333333334|       Neutral|
|     3361|     1265|      26248|          59|  

In [15]:
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='sentiment_bins',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [25]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'sentiment'], outputCol='features')

In [26]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [27]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [19]:
# Show labels and resulting features
cleaned.select(["label", "features"]).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[27544,63...|
|  0.0|(262145,[16757,27...|
|  1.0|(262145,[25964,76...|
|  0.0|(262145,[1415,173...|
|  0.0|(262145,[7796,115...|
|  0.0|(262145,[38698,64...|
|  0.0|(262145,[10446,11...|
|  1.0|(262145,[28497,52...|
|  0.0|(262145,[432,3657...|
|  0.0|(262145,[28253,38...|
|  0.0|(262145,[68693,82...|
|  0.0|(262145,[64859,77...|
|  0.0|(262145,[12710,82...|
|  0.0|(262145,[82967,15...|
|  0.0|(262145,[51444,82...|
|  1.0|(262145,[47646,57...|
|  0.0|(262145,[12409,47...|
|  0.0|(262145,[16337,63...|
|  0.0|(262145,[63045,82...|
|  0.0|(262145,[3775,147...|
+-----+--------------------+
only showing top 20 rows



In [28]:
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [29]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

Py4JJavaError: ignored

In [None]:
# Transform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
acc_eval = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction')
acc = acc_eval.evaluate(test_results)
print('Accuracy of model at predicting sentiment was: %f' % acc)