# Part 2: Streaming Twitter Sentiment Prediction

The following code below is a pure Python app. Its purpose is to connect to twitter's streaming API, receive the live tweets (a small sample of it), and then assemble batches of tweets into json files and drop them in a directory so that it can be consumed by PySpark's Structured Streaming.

> In the past, I write the tweets into a Socket in real time which can be consumed by Structured Stream. This approach has much lower latency, but the DataBricks environment does not seem to support Socket well.

- We use a [Tweepy](https://docs.tweepy.org/en/stable/) package for interfacing with Twitter
- Twitter has REST APIs (which you can use to make one-off requests) and a [streaming API](https://docs.tweepy.org/en/stable/stream.html), which you can maintain a live connection and keep receiving new tweets
- We choose to receive only covid related tweets in English. 
- After receiving tweets, we record the time stamp and the tweet text and discard other information. 
- We pack 50 tweets into a batch and write them into a json file in local directory.
- We also print the tweets on screen.

## Step 1. Obtain Twitter API Credentials
In order to use all of this though, we need to setup a Developer API account with Twitter and create an application to get credentials. 

- make sure you have a twitter account
- set up a Developer API account with Twitter
- create an application to get credentials at [https://apps.twitter.com/](https://apps.twitter.com/)
    + Consumer Key 
    + Consumer Secret 
    + Access Token
    + Access Token Secret

This will be entered into your `tweetread.ipynb`, so that you can hook up to twitter's streaming service and receive tweets.

## Step 2: Install `Tweepy`

In [0]:
%%bash

rm -f tweets.zip
rm -rf tweets
wget -nv http://idsdl.csom.umn.edu/c/share/msba6330/tweets.zip
unzip tweets.zip -d tweets


Archive:  tweets.zip
  inflating: tweets/20221105-123837.txt  
  inflating: tweets/20221105-123835.txt  
  inflating: tweets/20221105-123833.txt  
  inflating: tweets/20221105-123831.txt  
  inflating: tweets/20221105-123828.txt  
  inflating: tweets/20221105-123825.txt  
  inflating: tweets/20221105-123822.txt  
  inflating: tweets/20221105-123821.txt  
  inflating: tweets/20221105-123819.txt  
  inflating: tweets/20221105-123816.txt  
  inflating: tweets/20221105-123814.txt  
  inflating: tweets/20221105-123813.txt  
  inflating: tweets/20221105-123811.txt  
  inflating: tweets/20221105-123809.txt  
  inflating: tweets/20221105-123806.txt  
  inflating: tweets/20221105-123804.txt  
  inflating: tweets/20221105-123803.txt  
  inflating: tweets/20221105-123801.txt  
  inflating: tweets/20221105-123759.txt  
  inflating: tweets/20221105-123756.txt  
  inflating: tweets/20221105-123754.txt  
  inflating: tweets/20221105-123752.txt  
  inflating: tweets/20221105-123749.txt  
  inflating: 

In [0]:
Because we have stored tweets instead of real time tweets, please make the following modificaiton to the lab10-3-tweet-scoring notebook:

1. Change the maxFilePerTrigger to 1 (this allows you to consume the files less quickly). 
 .option("maxFilesPerTrigger",1)

2. In the final visualization SQL query, drop the where clause (which limits the window to the current time). The revised query should look like:

select sum(if(prediction=1,1,0)) as positive, sum(if(prediction=0,1,0)) as negative, window(time,"30 seconds") from scored_tweets 
group by window(time,"30 seconds");

## Step 3 Develop the TweetRead Program

In the following, we develop an app that is connected to twitter [Streaming API](https://docs.tweepy.org/en/stable/stream.html) and writes the tweets with timestamp periodically to a local directory.

- In the on_data event handler of TweetsListener, we will 
  - load the data into a json object 
  - extrat `created_at` and `text` from the json object and save it in a dictionary `{'time':, 'text':}`
  - append the dictionary to an array `buffer`
  - at the same time, print the tweet on screen.
  - maintain a counter of dictionaries in the buffer. If the buffer size exceeds `tweets_per_file`, we output the buffer to a file in the given directory, and then reset the buffer.
  
- in the sendData(directory) function, we will
  - create a TweetListener instance, supplying it with the twitter API credential.
  - save directory to the listener's directory property.
  - call the listener's [`filter` API](https://docs.tweepy.org/en/stable/stream.html#tweepy.Stream.filter) to start listening to tweets on a particular topic (`covid`) and lanaguage (`en`)
  
- In the main logic,
  - create a directory `/databricks/driver/tweets`
  - call sendData(...)

https://developer.twitter.com/en/docs/twitter-api/v1/data-dictionary/overview

In [0]:
import tweepy
import io
import json
import time
import os

## on databricks, use cluster /libraries /install new / PyPI type, enter tweepy to install it. For our demonstrate purpose, you may also use !pip install tweepy to install just on the driver node (this will not install on all cluster nodes)

# todo: Set up your credentials
# consumer_key=''
# consumer_secret=''
# access_token =''
# access_secret=''

class TweetsListener(tweepy.Stream):
  counter = 0 # for data counter.
  tweets_per_file = 10 # how many tweets per file? configure based on your needs
  buffer = []
  directory = None

  # on_data is an event that gets triggered each time there is new data (tweet) coming in.
  def on_data(self, data):
      try:
          # todo: load data into a json object and append to buffer, increase counter by 1.
          # todo: print the tweet.
          msg = json.loads(data)
          tweet = {"time": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(msg['create_at'],"%a %b %d %H: %M +0000 %Y")), "text": msg['']}
          print(f"{tweet['time']} - {tweet['text']}")
          self.buffer.append(tweet)
          self.coutner = self.counter + 1
          if(self.counter >= self.tweets_per_file):
            #todo: reset counter
            try:
                #todo: create a file using the timestamp time.strftime("%Y%m%d-%H%M%S"), dump buffer into the file, then reset buffer
                timestr = time.strftime("%Y%m%d-%H%M%S")
                with io.open(self.directory + "/" + timestr + '.txt', 'w', encoding = 'utf8') as f:
                    for row in self.buffer:
                      f.write(json.dumps(row))
                      f.write("\n")
                    f.close()
                #todo:reset counter
                self.counter = 0
                self.buffer = []
              
            except BaseException as e:
              print("error opening file:%s" % str(e))
          return True
      except BaseException as e:
          # if there is any error in processing the data, we print it on screen.
          print("Error on_data: %s" % str(e))
      return True
  
  # on_error gets triggered if there is some sort of error.
  def on_error(self, status):
      print(status)
      return True

def sendData(directory):  
  # todo: create a tweetslistner twitter_stream, configure the directory, and start listening.
  twitter_stream = TweetsListener(consumer_key, consumer_secret, access_token, access_secret)
  twitter_stream.directory = directory
  twitter_stream.filter(track = ['covid'], languages = ['en'])
try:
  os.mkdir("/databricks/driver/tweets")
except Exception:
  # if the dir already exists
  pass

sendData("/databricks/driver/tweets")


[0;31m---------------------------------------------------------------------------[0m
[0;31mModuleNotFoundError[0m                       Traceback (most recent call last)
[0;32m<command-2112301132522002>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0;32mimport[0m [0mtweepy[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0;32mimport[0m [0mio[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m [0;32mimport[0m [0mjson[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m [0;32mimport[0m [0mtime[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m [0;32mimport[0m [0mos[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/python_shell/dbruntime/PythonPackageImportsInstrumentation/__init__.py[0m in [0;36mimport_patch[0;34m(name, globals, locals, fromlist, level)[0m
[1;32m    165[0m             [0;31m# Import the desired module. If you’re seeing this while debugging a failed import,[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m    166[0m             [0;31m# l

## Note that this app runs forever; Make sure to cancel it when you're done using it.

In [0]:
%%bash
ls -l tweets
cat tweets/20221105-123637.txt

total 2084
-rw-r--r-- 1 root root 1715 Nov  5 12:36 20221105-123637.txt
-rw-r--r-- 1 root root 1687 Nov  5 12:36 20221105-123640.txt
-rw-r--r-- 1 root root 1938 Nov  5 12:36 20221105-123643.txt
-rw-r--r-- 1 root root 1846 Nov  5 12:36 20221105-123645.txt
-rw-r--r-- 1 root root 1728 Nov  5 12:36 20221105-123648.txt
-rw-r--r-- 1 root root 1932 Nov  5 12:36 20221105-123649.txt
-rw-r--r-- 1 root root 1601 Nov  5 12:36 20221105-123651.txt
-rw-r--r-- 1 root root 1844 Nov  5 12:36 20221105-123653.txt
-rw-r--r-- 1 root root 1560 Nov  5 12:36 20221105-123654.txt
-rw-r--r-- 1 root root 1707 Nov  5 12:36 20221105-123657.txt
-rw-r--r-- 1 root root 1612 Nov  5 12:37 20221105-123701.txt
-rw-r--r-- 1 root root 1821 Nov  5 12:37 20221105-123703.txt
-rw-r--r-- 1 root root 1638 Nov  5 12:37 20221105-123704.txt
-rw-r--r-- 1 root root 1773 Nov  5 12:37 20221105-123706.txt
-rw-r--r-- 1 root root 1736 Nov  5 12:37 20221105-123707.txt
-rw-r--r-- 1 root root 1825 Nov  5 12:37 20221105-123709.txt
-rw-r--r-- 1 

In [0]:
#de liu's email
Second, because the challenges in getting your Twitter Developer API approved (Twitter now has more lengthy and challenging process of getting the Elevated Access API, which is needed for capturing twitter stream). Please use the following alternative (second best) way of doing Lab 10-3. At a high level, we will use stored tweets as input to your stream Tweet scoring app.

1. Instead of running Lab10-3-tweetread app, please open a note book to run the following command to download and unzip ~500 or so (20 minutes worth of) tweet files. 

%%bash

rm -f tweets.zip
rm -rf tweets
wget -nv http://idsdl.csom.umn.edu/c/share/msba6330/tweets.zip
unzip tweets.zip -d tweets
Because we have stored tweets instead of real time tweets, please make the following modificaiton to the lab10-3-tweet-scoring notebook:

1. Change the maxFilePerTrigger to 1 (this allows you to consume the files less quickly). 

 .option("maxFilesPerTrigger",1)
2. In the final visualization SQL query, drop the where clause (which limits the window to the current time). The revised query should look like:

select sum(if(prediction=1,1,0)) as positive, sum(if(prediction=0,1,0)) as negative, window(time,"30 seconds") from scored_tweets 
group by window(time,"30 seconds");
You can turn in the revised lab files instead of the original ones.