In [1]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-2.4.6'
spark_version = 'spark-2.4.7'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Fetched 252 kB in 3s (97.7 kB/

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("8020Models").getOrCreate()

In [3]:
# Read in data from Github
from pyspark import SparkFiles
url ="https://raw.githubusercontent.com/James-Ashley/sentiment-analysis-dashboard/main/sentiment_classification/preprocessed_headlines.json"
spark.sparkContext.addFile(url)
df_git = spark.read.json(SparkFiles.get("preprocessed_headlines.json"))

In [4]:
df_git.show()

+--------------------+--------------------+--------------+-----------+--------------+-------------+--------------+--------------------+---------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|              author|   cleaned_headlines|compound_score|    keyword|negative_score|neutral_score|positive_score|           published|sentiment_human|  source|       text_complete|        text_excerpt|               tfidf|               title|              tokens|         tokens_lems|                 url|
+--------------------+--------------------+--------------+-----------+--------------+-------------+--------------+--------------------+---------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|           Dip Patel|could deport pare...|        0.1027|immigration|      

### Feature Transformations


In [5]:
df_git_clean = df_git.select('sentiment_human', 'tokens_lems').withColumnRenamed('sentiment_human', 'label')

#df_git_clean.show()

In [6]:
from pyspark.sql.functions import when

df_git_clean = df_git_clean.withColumn("label", \
              when(df_git_clean["label"] == -1, 2).otherwise(df_git_clean["label"]))

#df_git_clean.show()

In [7]:
from pyspark.ml.feature import HashingTF, IDF, StringIndexer
# Create all the features for the data set

# hashing
hashingTF = HashingTF(inputCol="tokens_lems", outputCol='hash_token')
# idf
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors - this assemble all columns you want to use as features
clean_up = VectorAssembler(inputCols=['idf_token'], outputCol='features')

In [9]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[hashingTF, idf, clean_up])

In [10]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(df_git_clean)
cleaned = cleaner.transform(df_git_clean)
#cleaned.show()

In [11]:
# Show label and resulting features
cleaned_final = cleaned.select(['label', 'features'])
cleaned_final.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    2|(262144,[9514,813...|
|    0|(262144,[2437,638...|
|    0|(262144,[13981,31...|
|    0|(262144,[15664,31...|
|    2|(262144,[6355,491...|
|    2|(262144,[54330,59...|
|    1|(262144,[7612,138...|
|    2|(262144,[9514,329...|
|    2|(262144,[13471,34...|
|    0|(262144,[34389,49...|
|    1|(262144,[26616,37...|
|    1|(262144,[37834,99...|
|    0|(262144,[46332,62...|
|    0|(262144,[7612,518...|
|    2|(262144,[39964,72...|
|    0|(262144,[36449,45...|
|    2|(262144,[4920,173...|
|    0|(262144,[23456,29...|
|    0|(262144,[7612,556...|
|    1|(262144,[7612,251...|
+-----+--------------------+
only showing top 20 rows



In [12]:
# Break data down into a training set and a testing set (train with 70%, test with 30%)
training, testing = cleaned_final.randomSplit([0.8, 0.2])

# Perform a stratified split to preserve class distribution
# Source: https://stackoverflow.com/questions/47637760/stratified-sampling-with-pyspark

# split dataframes between 0s, 1s, and 2s
# zeros = cleaned_final.filter(cleaned_final["label"]==0)
# ones = cleaned_final.filter(cleaned_final["label"]==1)
# twos = cleaned_final.filter(cleaned_final["label"]==2)

# # split datasets into training and testing

# train0, test0 = zeros.randomSplit([0.8,0.2], seed=1234)
# train1, test1 = ones.randomSplit([0.8,0.2], seed=1234)
# train2, test2 = twos.randomSplit([0.8,0.2], seed=1234)
# # stack datasets back together
# training = train0.union(train1).union(train2)
# testing = test0.union(test1).union(test2)

In [13]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [14]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
#test_results.show(5)

In [15]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting sentiment was: %f" % acc)

Accuracy of model at predicting sentiment was: 0.564089


In [16]:
from pyspark.ml.classification import LogisticRegression, OneVsRest

# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

# train the multiclass model.
ovrModel = ovr.fit(training)

# score the model on test data.
predictions = ovrModel.transform(testing)
#predictions.show(5)

In [17]:
# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Accuracy of model at predicting sentiment was: %f" % accuracy)

Accuracy of model at predicting sentiment was: 0.636513
