In [1]:
#Installed required configuration
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

!pip install -q findspark
!pip install boto3
!pip install re
!pip install spark-nlp==3.0.0

[31mERROR: Could not find a version that satisfies the requirement re (from versions: none)[0m
[31mERROR: No matching distribution found for re[0m


In [2]:
#Create spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("local[*]").getOrCreate()

In [3]:
import pandas as pd
from pyspark.sql import SQLContext
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

In [4]:
#Import the dataset

sqlContext = SQLContext(spark)
pdf = pd.read_excel('Corona_NLP_train.xlsx')
dataset = sqlContext.createDataFrame(pdf.astype(str))
dataset = dataset.select('OriginalTweet','Sentiment')

In [5]:
#Create NLP Pipline

#Stages
tokenizer_originalTweet = Tokenizer(inputCol='OriginalTweet', outputCol='originalTweet_tokens')
stopwords_remover_originalTweet = StopWordsRemover(inputCol='originalTweet_tokens', outputCol='filtered_originalTweet')
vectorizer_originalTweet = CountVectorizer(inputCol="filtered_originalTweet",outputCol="raw_features_originalTweet")
idf_originalTweet = IDF(inputCol='raw_features_originalTweet', outputCol='vectorized_features_originalTweet')

In [6]:
#Label Encoding
labelEncoder = StringIndexer(inputCol='Sentiment', outputCol='label').fit(dataset)
dataset = labelEncoder.transform(dataset)

In [7]:
#Split dataset
(train_df, test_df) = dataset.randomSplit((0.7,0.3),seed=42)

In [9]:
#Building pipeline
lr = LogisticRegression(featuresCol='vectorized_features_originalTweet', labelCol='label')
pipeline = Pipeline(stages=[tokenizer_originalTweet,stopwords_remover_originalTweet,vectorizer_originalTweet,idf_originalTweet,lr])
lr_model = pipeline.fit(train_df)

In [10]:
#Prediction test data
predictions = lr_model.transform(test_df)
predictions.show()

+--------------------+------------------+-----+--------------------+----------------------+--------------------------+---------------------------------+--------------------+--------------------+----------+
|       OriginalTweet|         Sentiment|label|originalTweet_tokens|filtered_originalTweet|raw_features_originalTweet|vectorized_features_originalTweet|       rawPrediction|         probability|prediction|
+--------------------+------------------+-----+--------------------+----------------------+--------------------------+---------------------------------+--------------------+--------------------+----------+
|  A revised rail ...|          Positive|  0.0|[, , a, revised, ...|  [, , revised, rai...|      (93690,[0,5,46,88...|             (93690,[0,5,46,88...|[193.343478629558...|[1.0,3.0089464313...|       0.0|
|  Consumer Alert ...|Extremely Negative|  4.0|[, , consumer, al...|  [, , consumer, al...|      (93690,[0,10,48,1...|             (93690,[0,10,48,1...|[-47.424499702330...|[3.

In [13]:
# Import evaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

Accuracy = 0.407693
Test Error = 0.592307 
