### Part 2 
### Classifying stream of tweets using the trained model 

In this section of the project, we are using our classifier model to classify stream of tweets. 

Steps include :

i. Pull data from twitter using twitter api   
ii. Preprocess data and extract features so that it can be easily classified by our model    
iii.Classify data using our model
iv. Save tweets and their predicted polarity to sql database. 

The detailed explaination of each code block is provided above the block

<b>Loading the required packages

In [9]:
#tweepy is python library to access twitter api
import tweepy
from tweepy import API
from tweepy import Cursor
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener


import numpy as np
import pandas as pd
import socket 

<b>We are using Findspark to add a startup file so that environment variables are properly set. 

In [10]:
import findspark
findspark.init('C:\Users\samir\Desktop\spark\spark-2.4.7-bin-hadoop2.7')

<b>Import pysark libraries

In [11]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover
from pyspark.ml.classification import LogisticRegression

### Accessing tweets

To access stream of tweets using twitter api, we first need to authenticate to a twitter developer account. There are 4 values - access token, access token secret, consumer key and consumer secret which are unique to each individual's account and are used to serve this purpose. These values are stored in variables as shown. 

In [12]:
access_token = <>
access_token_secret = <>
consumer_key = <>
consumer_secret= <>

<b>Creating a spark session with the user defined appName tweetsentimentalanalysis. 


In [13]:
spark = SparkSession.builder.appName('tweetsentimentanalysis').getOrCreate()
sc =spark.sparkContext

<b>We have used OAuthHandler class of tweepy to set the credentials that needs to be used in API calls to twitter.

In [14]:
auth = tweepy.OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_token_secret)
api = tweepy.API(auth)

<b>Once authentication is done, we are accessing about 15 random public tweets and printing it out. It can be seen that it has all the information that a tweet can have, such as it consists of links, emojis, hashtags etc.

In [29]:
public_tweet = api.home_timeline(count=15)
for tweet in public_tweet:
    print(tweet.text)

RT @strongblacklead: Welcome to the beginning of another week, y’all! 😂 https://t.co/0ZTMHMrULX
Tune in as the First Lady and I discuss the American Families Plan at Tidewater Community College. https://t.co/cUdNDi2DqY
Happening Now: The President and First Lady deliver remarks at Tidewater Community College. https://t.co/JG6hAJIFPz
The Marvel family is saddened to hear of the passing of artist John Paul Leon. His bold and iconic style made him a… https://t.co/M8VKMGo1NM
It turns out that the people like the For The People Act. 

https://t.co/UaH9dqdmNz
RT @Grace4NY: Across the country, Asian Americans are doing their part to protect and uplift our communities. Read about 6 of them here: ht…
Captain Marvel x @StephenCurry30 
Doctor Strange x @Money23Green 
Black Panther x @22wiggins 
Captain America x… https://t.co/msSVjOGHn8
Captain Marvel x @StephenCurry30 
Doctor Strange x @Money23Green 
Black Panther x @22wiggins 
Captain America x… https://t.co/crBkbJQhAq
RT @nasahqphoto: Former S

<b> We are creating a dataframe with below mentioned columns. This is done based on the format of data that we receive by twitter api call

In [30]:
df = pd.DataFrame(columns = ['Tweets', 'User', 'User_statuses_count', 
                             'user_followers', 'User_location', 'User_verified',
                             'fav_count', 'rt_count', 'tweet_date'])

<b> We are looping over all the tweets and saving data to the dataframe- 'df' that we created above.

In [39]:
i = 0
for tweet in public_tweet:
    df.loc[i, 'Tweets'] = tweet.text
    df.loc[i, 'User'] = tweet.user.name
    df.loc[i, 'User_statuses_count'] = tweet.user.statuses_count
    df.loc[i, 'user_followers'] = tweet.user.followers_count
    df.loc[i, 'User_location'] = tweet.user.location
    df.loc[i, 'User_verified'] = tweet.user.verified
    df.loc[i, 'fav_count'] = tweet.favorite_count
    df.loc[i, 'rt_count'] = tweet.retweet_count
    df.loc[i, 'tweet_date'] = tweet.created_at
    i+=1
    if i == 20:
        break
    else:
        pass

<b>The code below prints out first five row of random public tweets that are accessed. 

In [40]:
df.head()

Unnamed: 0,Tweets,User,User_statuses_count,user_followers,User_location,User_verified,fav_count,rt_count,tweet_date
0,RT @strongblacklead: Welcome to the beginning ...,Netflix,38633,11692568,"California, USA",True,0,30,2021-05-03 17:36:42
1,Tune in as the First Lady and I discuss the Am...,President Biden,645,10552082,,True,2137,384,2021-05-03 17:18:58
2,Happening Now: The President and First Lady de...,The White House,916,5216884,United States of America,True,1671,347,2021-05-03 17:16:36
3,The Marvel family is saddened to hear of the p...,Marvel Entertainment,57962,11167843,"New York, NY",True,2268,285,2021-05-03 17:13:45
4,It turns out that the people like the For The ...,Hillary Clinton,12674,30985420,"New York, NY",True,1733,354,2021-05-03 16:55:39


<b> We are using command below to get dataframe columns, dtype, memory usage and other information

In [42]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 15 entries, 0 to 14
Data columns (total 9 columns):
Tweets                 15 non-null object
User                   15 non-null object
User_statuses_count    15 non-null object
user_followers         15 non-null object
User_location          15 non-null object
User_verified          15 non-null object
fav_count              15 non-null object
rt_count               15 non-null object
tweet_date             15 non-null object
dtypes: object(9)
memory usage: 1.8+ KB


<b> We need to convert pandas dataframe to spark dataframe using sql context. This is done so that we can use our spark model to predict polarity. 

In [43]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc) 
spark_dff=sqlContext.createDataFrame(df)


### Prepocessing of data
As seen above, We receive a lot of information like user, user location, tweet date etc from twitter api. However, for classification of the tweet we only need the text column of the tweet. We separate the text part and  further process the data to get rid of unwanted words or stop words in the tweet. Finally, we extract a set of meaningful words that are essential for the classification. 

The below code takes each tweet and separates every word in sentence and then stores the words in a list. 5 tweets are displayed for sample

In [44]:
tokenizer = Tokenizer(inputCol="Tweets", outputCol="SeWords")
tokenizedTweet = tokenizer.transform(spark_dff)
tokenizedTweet.show(truncate=False, n=5)

+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-------------------+--------------+------------------------+-------------+---------+--------+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Tweets                                                                                                                                      |User                |User_statuses_count|user_followers|User_location           |User_verified|fav_count|rt_count|tweet_date         |SeWords                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------

we get rid of unwanted words such as 'i', 'the', 'you' etc that do not contribute to classifying the tweet. The useful words are retained and stored as MeaningfulWords 

In [45]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemovedTweet = swr.transform(tokenizedTweet)
SwRemovedTweet.show(truncate=False, n=5)

+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-------------------+--------------+------------------------+-------------+---------+--------+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
|Tweets                                                                                                                                      |User                |User_statuses_count|user_followers|User_location           |User_verified|fav_count|rt_count|tweet_date         |SeWords                                                                                                                                                     

Numerical features are created from Meaningful words using code below. HashingTF funtion using Austin Appleby's MurmurHash 3 algorithm is implemented. Sample output of top 3 rows are displayed after implementing the code.

In [46]:
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTweetData = hashTF.transform(SwRemovedTweet).select(
    'MeaningfulWords', 'features')
numericTweetData.show(truncate=False, n=3)

+--------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------+
|MeaningfulWords                                                                                                     |features                                                                                                                          |
+--------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------+
|[rt, @strongblacklead:, welcome, beginning, another, week,, y’all!, 😂, https://t.co/0ztmhmrulx]                    |(262144,[7558,18552,81213,83805,96725,109818,190108,193347,248305],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                         |
|

### Running the model
The model built in part1 is used to classify the stream of tweets. Model is first loaded and is saved as 'loadedModel'. Then meaningful words obtained above is passed and the model predicts the polarity of the tweets. Sample of top 5 rows of Meaningful words and the predicted class has been displayed below.

In [48]:
import sys
import os
from pyspark.ml.classification import LogisticRegressionModel
loadedModel =LogisticRegressionModel.load("comodel")

In [49]:
predictionagain = loadedModel.transform(numericTweetData)
predictionFinalagain = predictionagain.select('MeaningfulWords', 'prediction')
predictionFinalagain.show(n=5, truncate = False)

+----------------------------------------------------------------------------------------------------------------------------+----------+
|MeaningfulWords                                                                                                             |prediction|
+----------------------------------------------------------------------------------------------------------------------------+----------+
|[rt, @strongblacklead:, welcome, beginning, another, week,, y’all!, 😂, https://t.co/0ztmhmrulx]                            |4.0       |
|[tune, first, lady, discuss, american, families, plan, tidewater, community, college., https://t.co/cudndi2dqy]             |4.0       |
|[happening, now:, president, first, lady, deliver, remarks, tidewater, community, college., https://t.co/jg6hajifpz]        |0.0       |
|[marvel, family, saddened, hear, passing, artist, john, paul, leon., bold, iconic, style, made, a…, https://t.co/m8vkmgo1nm]|0.0       |
|[turns, people, like, people, act.

<b>Spark dataframe is converted to pandas dataframe. Moreover, the MeaningfulWords field is of dtype- object. We are changing it to 'string' format to make it easier to save to database. 

In [50]:
pandasDF = predictionFinalagain.toPandas()
pandasDF['MeaningfulWords'] = pandasDF['MeaningfulWords'].astype("|S") 
pandasDF.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15 entries, 0 to 14
Data columns (total 2 columns):
MeaningfulWords    15 non-null object
prediction         15 non-null float64
dtypes: float64(1), object(1)
memory usage: 312.0+ bytes


### Part 3
### Storing the output in Database

The output obtained is stored in sql database. The connection to the database is established and db transaction is performed using pyodbc package. For loop is used to iterate through each row of dataframe and add them to the database by row. The name of the columns in the database are 
Tweets - consists of the extracted meaningful words 
Polarity - consists of the target class value.

In [65]:
import pyodbc


In [66]:
#connect to database
conn = pyodbc.connect('DRIVER=ODBC Driver 17 for SQL Server;SERVER=LAPTOP-SAMU,1433;DATABASE=TwitterDataDB;Trusted_Connection=yes;')
cursor = conn.cursor()
cursor.execute("Create Table TwitterData(Tweets text, Polarity float)")
for index, row in pandasDF.iterrows():
    cursor.execute("INSERT INTO TwitterDataDB.dbo.TwitterData(Tweets, Polarity) values(?,?)", row.MeaningfulWords, row.prediction)


In [67]:
conn.commit()

Following is the screenshot of the output stored in the database

![sql_output_big_data.JPG](sql_output_big_data.JPG "sql_output_big_data")

References 

[1] https://docs.tweepy.org/en/v3.7.0/getting_started.html

[2] https://datatofish.com/how-to-connect-python-to-sql-server-using-pyodbc/