## Step 1: write a streaming application

- load the saved data transformation Pipeline as `fitted_target_pipeline`
- load the saved Pipeline Model as `pipelineModel`

as part of the streaming processing:
- load streaming data from directory `file:/databricks/driver/tmp/`
- use the `fitted_target_pipeline` to transform the streaming dataframe to the correct format for prediction.
- use the `pipelineModel` to transform the streaming dataframe.
- drop the unwanted intermediarely columns in the output, keeping only `text`, `time`, and `prediction`
- output to a `memory` sink (`scored_tweets`) using the append mode.
- trigger output in `2 seconds` intervals.

In [0]:
!ls /databricks/driver/tmp/

review1.txt    review172.txt  review245.txt  review318.txt  review391.txt
review10.txt   review173.txt  review246.txt  review319.txt  review392.txt
review100.txt  review174.txt  review247.txt  review32.txt   review393.txt
review101.txt  review175.txt  review248.txt  review320.txt  review394.txt
review102.txt  review176.txt  review249.txt  review321.txt  review395.txt
review103.txt  review177.txt  review25.txt   review322.txt  review396.txt
review104.txt  review178.txt  review250.txt  review323.txt  review397.txt
review105.txt  review179.txt  review251.txt  review324.txt  review398.txt
review106.txt  review18.txt   review252.txt  review325.txt  review399.txt
review107.txt  review180.txt  review253.txt  review326.txt  review4.txt
review108.txt  review181.txt  review254.txt  review327.txt  review40.txt
review109.txt  review182.txt  review255.txt  review328.txt  review400.txt
review11.txt   review183.txt  review256.txt  review329.txt  review401.txt
review110.txt  review184.txt

In [0]:
from pyspark.sql.functions import to_timestamp, substring, col
from pyspark.ml import PipelineModel

In [0]:
# Read streaming data with the full schema
schema = "target integer, id long, date string, flag string, user string, text string"
lines = spark.readStream.option("maxFilesPerTrigger",1).csv("file:/databricks/driver/tmp/", sep=",", schema=schema)\
             .drop("id", "flag", "user")\
             .withColumn("time", to_timestamp(substring(col("date"),5,24),"MMM dd HH:mm:ss zzz yyyy"))\
             .drop("date")

fitted_target_pipeline = PipelineModel.load("/FileStore/fitted_target_pipeline/")
transformed_stream = fitted_target_pipeline.transform(lines).drop("target_double","target")
transformed_stream.display()


text,time,label
"@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer. You shoulda got David Carr of Third Day to do it. ;D",2009-04-07T05:19:45.000+0000,0.0
is upset that he can't update his Facebook by texting it... and might cry as a result School today also. Blah!,2009-04-07T05:19:49.000+0000,0.0
@Kenichan I dived many times for the ball. Managed to save 50% The rest go out of bounds,2009-04-07T05:19:53.000+0000,0.0
my whole body feels itchy and like its on fire,2009-04-07T05:19:57.000+0000,0.0
"@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there.",2009-04-07T05:19:57.000+0000,0.0
@Kwesidei not the whole crew,2009-04-07T05:20:00.000+0000,0.0
Need a hug,2009-04-07T05:20:03.000+0000,0.0
"@LOLTrish hey long time no see! Yes.. Rains a bit ,only a bit LOL , I'm fine thanks , how's you ?",2009-04-07T05:20:03.000+0000,0.0
@Tatiana_K nope they didn't have it,2009-04-07T05:20:05.000+0000,0.0
@twittera que me muera ?,2009-04-07T05:20:09.000+0000,0.0


In [0]:
modelPath = "/FileStore/twitter_nbpipeline"
pipelineModel = PipelineModel.load(modelPath)

scored_tweets = pipelineModel.transform(transformed_stream)
query = scored_tweets.drop("rawPrediction", "probability", "features", "words", "words_filtered")\
  .writeStream\
  .format("memory")\
  .queryName("scored_tweets")\
  .outputMode("append")\
  .trigger(processingTime='2 seconds')\
  .start()

## Step 2: View the stream results

- Query the number of rows in the `scored_teweets` table
- Visulize the count of positive and negative tweets by 30 second windows.

In [0]:
%sql
select COUNT(*) from scored_tweets;

count(1)
309


In [0]:
%sql

select window(time,"30 seconds"), sum(if(prediction=1,1,0)) as positive, sum(if(prediction=0,1,0)) as negative from scored_tweets
where time > (select max(time) from scored_tweets) - INTERVAL 1 minutes
group by window(time,"30 seconds")

window,positive,negative
"List(2009-04-07T05:40:00.000+0000, 2009-04-07T05:40:30.000+0000)",0,3
"List(2009-04-07T05:41:00.000+0000, 2009-04-07T05:41:30.000+0000)",0,8
"List(2009-04-07T05:40:30.000+0000, 2009-04-07T05:41:00.000+0000)",1,6
