# [Scalable Data Science](http://www.math.canterbury.ac.nz/~r.sainudiin/courses/ScalableDataScience/)


### prepared by [Akinwande Atanda](https://nz.linkedin.com/in/akinwande-atanda)

*supported by* [![](https://raw.githubusercontent.com/raazesh-sainudiin/scalable-data-science/master/images/databricks_logoTM_200px.png)](https://databricks.com/)
and 
[![](https://raw.githubusercontent.com/raazesh-sainudiin/scalable-data-science/master/images/AWS_logoTM_200px.png)](https://www.awseducate.com/microsite/CommunitiesEngageHome)


## [Tweet Analytics](https://github.com/aaa121/Spark-Tweet-Streaming-Presentation-May-2016)

## Creating Pipeline with Loop and Productionizing with Historical Tweets

In [3]:
from pyspark.ml import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *
from pyspark.ml.regression import *
from pyspark.sql.types import *

In [4]:
df = table("pos_neg_category")

In [5]:
df.dtypes

In [6]:
lrARValidate =[]
lrARTest =[]
param = [0.0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0]
for p in param:
  bin = Binarizer(inputCol = "category", outputCol = "label", threshold = 0.5) # Positive reviews > 0.5 threshold
  tok = Tokenizer(inputCol = "review", outputCol = "word") #Note: The column "words" in the original table can also contain sentences that will be tokenized
  hashTF = HashingTF(inputCol = tok.getOutputCol(), numFeatures = 5000, outputCol = "features")
  lr = LogisticRegression(maxIter = 10, regParam = 0.01, elasticNetParam = p)
  pipeline = Pipeline(stages = [bin, tok, hashTF, lr])
  (trainingData, validateData, testData) = df.randomSplit([0.6, 0.3, 0.1])
  model = pipeline.fit(trainingData)
  validateModel=model.transform(validateData)
  evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="precision")
  accuracyValidateSet = evaluator.evaluate(validateModel)
  testModel=model.transform(testData)
  accuracyTestSet = evaluator.evaluate(testModel)
#   print("Logistic Regression Classifier Accuracy Rate for Validation Dataset = %g " % (accuracyValidateSet))
#   print("Logistic Regression Classifier Accuracy Rate for Test Dataset = %g " % (accuracyTestSet))
#   print("Test Error = %g " % (1.0 - accuracy))
  lrARValidate +=[(p,accuracyValidateSet)]
  lrARTest +=[(p,accuracyTestSet)]

In [7]:
#display(pipeline)

In [8]:
lrARValidate

In [9]:
lrARTest

In [10]:
print("Logistic Regression Classifier Accuracy Rate for Validation Dataset= ", lrARValidate)

In [11]:
print("Logistic Regression Classifier Accuracy Rate for Test Dataset= ", lrARTest)

## Productionizing with Historical Tweets

**Load/Read the saved Tweets in Parquet format**

In [14]:
trumpTweet = sqlContext.read.parquet("dbfs:/mnt/s3Data/TrumpSentiment.parquet")

**Convert to Table**

In [16]:
trumpTweet.registerTempTable('TrumpTweetTable')

In [17]:
tT=sqlContext.read.table('TrumpTweetTable')

**Read the data type of each column in the table**

In [19]:
trumpTweet.dtypes

In [20]:
# sqlContext.sql("SELECT COUNT(*) FROM TrumpTweetTable")

**Change the favourite count from double to float**

In [22]:
sqlContext.sql("SELECT date, review, CAST(category as FLOAT) as category FROM TrumpTweetTable order by date asc").cache

**Randomly split Dataframe into two or three sets**

In [24]:
(trump1, trump2, trump3) = trumpTweet.randomSplit([0.1, 0.5, 0.4])

**Transform the fitted algorithm to predict the category of the tweet being either positive or negative**

In [26]:
#  tweetModel=model.transform(trump1)

**Transform the fitted algorithm to predict the category of the tweet being either positive or negative**

In [28]:
 tweetModel=model.transform(trumpTweet)

**Determine the accuracy rate of the predicted sentiment**

In [30]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="precision")

In [31]:
accuracytweetSet = evaluator.evaluate(tweetModel)

In [32]:
accuracytweetSet

In [33]:
# display(tweetModel.select("prediction", "review", "probability"))

**Display the predicted category, tweet and probability of the tweet being negative**

In [35]:
tweetModel.select("prediction", "review", "probability").show(100)

**Save the sentiment category of the historical tweets for additional ETL**

In [37]:
trumpSentiment=tweetModel.select("prediction", "review", "probability")

In [38]:
trumpSentiment.write.save("dbfs:/mnt/s3Data/trumpSen.parquet")  

In [39]:
trumpSentiment.show(50)

In [40]:
display(dbutils.fs.ls("dbfs:/mnt/s3Data"))

In [41]:
trumpSen= sqlContext.read.parquet("dbfs:/mnt/s3Data/trumpSen.parquet")

In [42]:
trumpSen.registerTempTable('trumpSenTable')

In [43]:
%sql SELECT COUNT(*) as TweetCount FROM trumpSenTable

In [44]:
%sql SELECT * FROM trumpSenTable WHERE prediction ==1 LIMIT 5

**Count and plot the percentage of Tweets about Trump that is positive and negative**

In [46]:
%sql SELECT if(prediction == 1, "positive", "negative") as Sentiment, count(*) as TweetCount FROM trumpSenTable GROUP BY prediction ORDER BY prediction

# [Scalable Data Science](http://www.math.canterbury.ac.nz/~r.sainudiin/courses/ScalableDataScience/)


### prepared by [Akinwande Atanda](https://nz.linkedin.com/in/akinwande-atanda)

*supported by* [![](https://raw.githubusercontent.com/raazesh-sainudiin/scalable-data-science/master/images/databricks_logoTM_200px.png)](https://databricks.com/)
and 
[![](https://raw.githubusercontent.com/raazesh-sainudiin/scalable-data-science/master/images/AWS_logoTM_200px.png)](https://www.awseducate.com/microsite/CommunitiesEngageHome)


## [Tweet Analytics](https://github.com/aaa121/Spark-Tweet-Streaming-Presentation-May-2016)