## Streaming Tweet Sentiment Prediction

### Streaming Application
- load the saved Pipeline Model as `pipelineModel` as part of the streaming processing:
- load streaming data from directory `file:/databricks/driver/tweets`
- use the pipeline model to transform the streaming dataframe.
- drop the unwanted intermediarely columns in the output, keeping only `text`, `time`, and `prediction`
- output to a `memory` sink (`scored_tweets`) using the append mode.
- trigger output in `2 seconds` intervals.

In [0]:
%%bash
rm -f tweets.zip
rm -rf tweets
wget -nv http://idsdl.csom.umn.edu/c/share/msba6330/tweets.zip
unzip tweets.zip -d tweets

Archive:  tweets.zip
  inflating: tweets/20221105-123837.txt  
  inflating: tweets/20221105-123835.txt  
  inflating: tweets/20221105-123833.txt  
  inflating: tweets/20221105-123831.txt  
  inflating: tweets/20221105-123828.txt  
  inflating: tweets/20221105-123825.txt  
  inflating: tweets/20221105-123822.txt  
  inflating: tweets/20221105-123821.txt  
  inflating: tweets/20221105-123819.txt  
  inflating: tweets/20221105-123816.txt  
  inflating: tweets/20221105-123814.txt  
  inflating: tweets/20221105-123813.txt  
  inflating: tweets/20221105-123811.txt  
  inflating: tweets/20221105-123809.txt  
  inflating: tweets/20221105-123806.txt  
  inflating: tweets/20221105-123804.txt  
  inflating: tweets/20221105-123803.txt  
  inflating: tweets/20221105-123801.txt  
  inflating: tweets/20221105-123759.txt  
  inflating: tweets/20221105-123756.txt  
  inflating: tweets/20221105-123754.txt  
  inflating: tweets/20221105-123752.txt  
  inflating: tweets/20221105-123749.txt  
  inflating: 

In [0]:
from pyspark.ml import PipelineModel
 
modelPath = '/FileStore/twitter_nbpipelne'
inputPath = 'file:/databricks/driver/tweets'
 
pipelineModel = PipelineModel.load(modelPath)
 
# input source
streamingInputDF = spark.readStream.schema('time timestamp, text string').json(inputPath)
 
# processing logic
scored_tweets = pipelineModel.transform(streamingInputDF)
 
# sink
query = scored_tweets.drop('rawPrediction', 'probability', 'features', 'words', 'words_filtered') \
  .writeStream \
  .trigger(processingTime = '2 seconds') \
  .format('memory') \
  .queryName('scored_tweets') \
  .outputMode('append') \
  .start()

### View the stream results

- Query the number of rows in the `scored_teweets` table
- Get the count of positive and negative tweets by 30 second windows

In [0]:
%sql
select count(*) from scored_tweets;

count(1)
5210


In [0]:
%sql
select sum(if(prediction=1,1,0)) as positive, sum(if(prediction=0,1,0)) as negative, window(time,"30 seconds") from scored_tweets 
group by window(time,"30 seconds")

positive,negative,window
62,61,"List(2022-11-05T17:54:00.000+0000, 2022-11-05T17:54:30.000+0000)"
51,80,"List(2022-11-05T17:47:30.000+0000, 2022-11-05T17:48:00.000+0000)"
34,26,"List(2022-11-05T17:55:30.000+0000, 2022-11-05T17:56:00.000+0000)"
74,83,"List(2022-11-05T17:42:00.000+0000, 2022-11-05T17:42:30.000+0000)"
72,68,"List(2022-11-05T17:46:00.000+0000, 2022-11-05T17:46:30.000+0000)"
62,71,"List(2022-11-05T17:53:30.000+0000, 2022-11-05T17:54:00.000+0000)"
73,77,"List(2022-11-05T17:48:00.000+0000, 2022-11-05T17:48:30.000+0000)"
67,80,"List(2022-11-05T17:38:30.000+0000, 2022-11-05T17:39:00.000+0000)"
58,66,"List(2022-11-05T17:44:00.000+0000, 2022-11-05T17:44:30.000+0000)"
63,78,"List(2022-11-05T17:49:30.000+0000, 2022-11-05T17:50:00.000+0000)"
