# Import module and create sparksession

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import* # function pour calculer somme etc
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover
# sparksession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Sentiment Analysis Spark').config("spark.some.config.option", "some-value").getOrCreate()

# Read datafile into SparkDataframe

In [8]:
# read ccsv file into dataframe with autatically inferredschema
create_csv = spark.read.csv(r'C:\v_data\tweets.csv', inferSchema =True, header =True)
create_csv.show(truncate=False, n=3)

+------+---------+---------------+---------------------------------+
|ItemID|Sentiment|SentimentSource|SentimentText                    |
+------+---------+---------------+---------------------------------+
|1038  |1        |Sentiment140   |that film is fantastic #brilliant|
|1804  |1        |Sentiment140   |this music is really bad #myband |
|1693  |0        |Sentiment140   |winter is terrible #thumbs-down  |
+------+---------+---------------+---------------------------------+
only showing top 3 rows



# Selected the related data

In [16]:

data = create_csv.select("SentimentText", col("Sentiment").cast("Int").alias("label"))
data.show(truncate = False,n=5)

+---------------------------------+-----+
|SentimentText                    |label|
+---------------------------------+-----+
|that film is fantastic #brilliant|1    |
|this music is really bad #myband |1    |
|winter is terrible #thumbs-down  |0    |
|this game is awful #nightmare    |0    |
|I love jam #loveit               |1    |
+---------------------------------+-----+
only showing top 5 rows



# divide data into training and testing


In [19]:
#divide data, 70% for training, 30% for testing
divideData= data.randomSplit([0.7, 0.3])
trainingData= divideData[0]#index 0 = data training
testingData =divideData[1]  #index 1 = data testing
train_rows = trainingData.count()
test_rows= testingData.count()
print ("Training data rows:", train_rows, "; Testing data rows:", test_rows)


Training data rows: 1306 ; Testing data rows: 626


# Prepare training Data

In [None]:
#Separate "SentimentText" into individual words using tokenizer

In [20]:
tokenizer = Tokenizer(inputCol="SentimentText", outputCol="SentimentWords")
tokenizedTrain= tokenizer.transform(trainingData)
tokenizedTrain.show(truncate=False,n =3)#Show top 5 rows and full column contents (PySpark)


+-------------------------+-----+------------------------------+
|SentimentText            |label|SentimentWords                |
+-------------------------+-----+------------------------------+
|I adore cheese #bestever |1    |[i, adore, cheese, #bestever] |
|I adore cheese #loveit   |1    |[i, adore, cheese, #loveit]   |
|I adore cheese #toptastic|1    |[i, adore, cheese, #toptastic]|
+-------------------------+-----+------------------------------+
only showing top 3 rows



In [None]:
# removing stopwords(unimportant words to be features)

In [23]:
swr= StopWordsRemover(inputCol = tokenizer.getOutputCol(), outputCol="MeaningFullWords")
swrRemovedTrain= swr.transform(tokenizedTrain)
swrRemovedTrain.show(truncate=False, n= 3)

+-------------------------+-----+------------------------------+---------------------------+
|SentimentText            |label|SentimentWords                |MeaningFullWords           |
+-------------------------+-----+------------------------------+---------------------------+
|I adore cheese #bestever |1    |[i, adore, cheese, #bestever] |[adore, cheese, #bestever] |
|I adore cheese #loveit   |1    |[i, adore, cheese, #loveit]   |[adore, cheese, #loveit]   |
|I adore cheese #toptastic|1    |[i, adore, cheese, #toptastic]|[adore, cheese, #toptastic]|
+-------------------------+-----+------------------------------+---------------------------+
only showing top 3 rows



In [27]:
#Converting words feature into numerical feature.
#In Spark 2.2.1,it is implemented in HashingTF funtion using Austin Appleby's MurmurHash 3 algorithm
hashTF= HashingTF(inputCol=swr.getOutputCol(), outputCol='features')
numericTrainingData= hashTF.transform(swrRemovedTrain).select('label', 'MeaningFullWords', 'features')
numericTrainingData.show(truncate=False, n =3 )
#hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
#numericTrainData = hashTF.transform(SwRemovedTrain).select(
#    'label', 'MeaningfulWords', 'features')
#numericTrainData.show(truncate=False, n=3)

+-----+---------------------------+-------------------------------------------+
|label|MeaningFullWords           |features                                   |
+-----+---------------------------+-------------------------------------------+
|1    |[adore, cheese, #bestever] |(262144,[1689,91011,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #loveit]   |(262144,[1689,100089,254974],[1.0,1.0,1.0])|
|1    |[adore, cheese, #toptastic]|(262144,[1689,42010,100089],[1.0,1.0,1.0]) |
+-----+---------------------------+-------------------------------------------+
only showing top 3 rows



# train our classifier model using traindata with  model logistic regression

In [28]:
lr= LogisticRegression(labelCol='label', featuresCol='features', maxIter=10, regParam=0.01)
model= lr.fit(numericTrainingData)
print("training is OK")

Exception ignored in: <function JavaWrapper.__del__ at 0x00000170C9A0C040>
Traceback (most recent call last):
  File "C:\Users\INSA\anaconda3\envs\INSA\lib\site-packages\pyspark\ml\wrapper.py", line 39, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'StopWordsRemover' object has no attribute '_java_obj'


training is OK


# prepare testing Data

In [29]:
tokenizedTest = tokenizer.transform(testingData)
tokenizedTest.show(truncate=False, n=5)

+----------------------------------+-----+----------------------------------------+
|SentimentText                     |label|SentimentWords                          |
+----------------------------------+-----+----------------------------------------+
|I adore cheese #brilliant         |1    |[i, adore, cheese, #brilliant]          |
|I adore cheese #favorite          |1    |[i, adore, cheese, #favorite]           |
|I adore cheese #thumbs-up         |1    |[i, adore, cheese, #thumbs-up]          |
|I adore classical music #favorite |1    |[i, adore, classical, music, #favorite] |
|I adore classical music #thumbs-up|1    |[i, adore, classical, music, #thumbs-up]|
+----------------------------------+-----+----------------------------------------+
only showing top 5 rows



In [30]:
swrRemovedTest= swr.transform(tokenizedTest) # unimportant words


In [31]:
numericTestData= hashTF.transform(swrRemovedTest).select('label', 'MeaningFullWords', 'features') # wordsintonumeric
numericTestData.show(truncate=False, n=4)

+-----+------------------------------------+--------------------------------------------------------+
|label|MeaningFullWords                    |features                                                |
+-----+------------------------------------+--------------------------------------------------------+
|1    |[adore, cheese, #brilliant]         |(262144,[1689,45361,100089],[1.0,1.0,1.0])              |
|1    |[adore, cheese, #favorite]          |(262144,[1689,100089,108624],[1.0,1.0,1.0])             |
|1    |[adore, cheese, #thumbs-up]         |(262144,[1689,88825,100089],[1.0,1.0,1.0])              |
|1    |[adore, classical, music, #favorite]|(262144,[100089,102383,108624,131250],[1.0,1.0,1.0,1.0])|
+-----+------------------------------------+--------------------------------------------------------+
only showing top 4 rows



# Predict testing data and calculate the accuracy model

In [43]:
import pyspark.ml
##val denseVector = r.getAs[org.apache.spark.ml.linalg.SparseVector]("features").toDense
#org.apache.spark.mllib.linalg.Vectors.fromML(denseVector)
#import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
#import org.apache.spark.mllib.linalg.Vectors

In [48]:
! pip install spark-1.4.1-bin-hadoop2.6/bin/spark-submit --driver-memory 5g --packages com.databricks:spark-csv_2.10:1.1.0 fbi_spark.py


Usage:   
  pip install [options] <requirement specifier> [package-index-options] ...
  pip install [options] -r <requirements file> [package-index-options] ...
  pip install [options] [-e] <vcs project url> ...
  pip install [options] [-e] <local project path> ...
  pip install [options] <archive url/path> ...

no such option: --driver-memory


In [None]:
prediction = model.predict(numericTestData)
predictionFinal = prediction.select('label', 'MeaningFullWords', 'features')
predictionFinal.show(truncate=False, n=5)
correctPrediction = predictionFinal.filter(
    predictionFinal['prediction'] == predictionFinal['Label']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", totalData, 
      ", accuracy:", correctPrediction/totalData)