# Streaming Tweets into Database

In [ ]:
import mysql.connector
from mysql.connector import Error
import tweepy
from textblob import TextBlob
import credentials

In [ ]:
TRACK_WORDS = ['covid19']
TABLE_NAME = "covid19"
TABLE_ATTRIBUTES = "id_str VARCHAR(255), created_at DATETIME, text VARCHAR(1000), \
            polarity DECIMAL(5,4), subjectivity DECIMAL(5,4), user_created_at VARCHAR(255), user_id_str VARCHAR(255), user_screen_name VARCHAR(255), user_location VARCHAR(255), \
            user_description VARCHAR(255), user_followers_count INT, longitude DOUBLE, latitude DOUBLE, \
            retweet_count INT, favorite_count INT"

In [ ]:
try:
    connection = mysql.connector.connect(host=credentials.HOST,
                                         database=credentials.DATABASE,
                                         user=credentials.USER,
                                         password=credentials.PASSWORD)
    if connection.is_connected():
        cursor = connection.cursor()
        cursor.execute("""SELECT COUNT(*) FROM information_schema.tables WHERE table_name= '{0}'""".format(TABLE_NAME))
        if cursor.fetchone()[0] != 1:
            cursor.execute("CREATE TABLE {} ({})".format(TABLE_NAME, TABLE_ATTRIBUTES))
            connection.commit()
        cursor.close()
        
except Error as e:
    print("Error connecting:", e)
# finally:
#     if (connection.is_connected()):
#         cursor.close()
#         connection.close()
#         print("Connection closed")

In [ ]:
def deEmojify(text):
    if text:
        return text.encode('ascii', 'ignore').decode('ascii')
    else:
        return None

In [ ]:
# Override tweepy's default StreamLister to add logic to on_status and on_error
class CustomStreamListener(tweepy.StreamListener):
    
    def on_status(self, status):
        '''Extract tweet (aka status) content.'''
        if (status.retweeted) or ('RT @' in status.text):
            return True
        id_str = status.id_str
        created_at = status.created_at
        try:
            text = deEmojify(status.extended_tweet["full_text"])
        except AttributeError:
            text = deEmojify(status.text)
        sentiment = TextBlob(text).sentiment
        polarity = sentiment.polarity
        subjectivity = sentiment.subjectivity
        
        user_created_at = status.user.created_at
        user_location = deEmojify(status.user.location)
        user_description = deEmojify(status.user.description)
        user_followers_count = status.user.followers_count
        user_id_str = status.user.id_str
        user = api.get_user(user_id_str)
        user_screen_name = user.screen_name
        
        latitude = None
        longitude = None
        if status.coordinates:
            longitude = status.coordinates["coordinates"][0]
            latitude = status.coordinates["coordinates"][1]
    
        retweet_count = status.retweet_count
        favorite_count = status.favorite_count
        
        print("\nUSER ", user_id_str, user_screen_name)
        print("Tweet length: ", len(text))
        print("Polarity: ", polarity)
        print("Subjectivity: ", subjectivity)
        print("Tweet content: ", status.text)
        print("Lat: {}, Long: {}".format(latitude, longitude))
        
        if connection.is_connected():
            cursor = connection.cursor()
            sql = """INSERT INTO {} (id_str,created_at,text,polarity,
                subjectivity, user_created_at, user_id_str, user_screen_name, user_location,
                user_description, user_followers_count, longitude,
                latitude, retweet_count, favorite_count) VALUES
                (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""".format(TABLE_NAME)
            values = (id_str, created_at, text, polarity, subjectivity,
                user_created_at, user_id_str, user_screen_name, user_location, user_description,
                user_followers_count, longitude, latitude, retweet_count, favorite_count)
            cursor.execute(sql, values)
            connection.commit()
            cursor.close()
        else:
            print("Not connected to database")
            return False
    
    def on_error(self, status_code):
        '''Handle rate limiting.'''
        if status_code == 420:
            # Disconnect
            return False

In [ ]:
auth = tweepy.OAuthHandler(credentials.API_KEY, credentials.API_SECRET_KEY)
auth.set_access_token(credentials.ACCESS_TOKEN, credentials.ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)

Access a tweet by ID:

https://twitter.com/ShaliniAlok/status/1246499367202066432

* Subjectivity: float [0.0, 1.0] - where 0.0 is very objective and 1.0 is very subjective
* Polarity: float [-1.0, 1.0] - where -1.0 is very negative and 1.0 is very positive

In [None]:
customStreamListener = CustomStreamListener()
stream = tweepy.Stream(auth=api.auth, listener=customStreamListener)
stream.filter(languages=["en"], track=TRACK_WORDS)
connection.close()


USER  3031934368 MarissaEsque
Tweet length:  256
Polarity:  0.19357142857142856
Subjectivity:  0.4992857142857143
Tweet content:  In a virtual meeting with @SenSchumer, his staff, and a bunch of other progressive advocates right now, talking abo… https://t.co/FnJI0jvh1g
Lat: None, Long: None

USER  867755682556248064 FemicideWatch
Tweet length:  281
Polarity:  0.0
Subjectivity:  0.1
Tweet content:  Following our awareness on the impacts of #VAWG&amp;COVID19: In Mexico, since the quarantine was announced, calls to de… https://t.co/7Tw5RaAUN0
Lat: None, Long: None

USER  101244025 aogulmus
Tweet length:  235
Polarity:  0.69
Subjectivity:  0.95
Tweet content:  We as humans haven’t been very kind to this earth. Pollution, deforestation, slaughtering animals... This is kind o… https://t.co/oSx8IQjexe
Lat: None, Long: None

USER  315429133 vjacqueline66
Tweet length:  150
Polarity:  -0.16666666666666666
Subjectivity:  0.43333333333333335
Tweet content:  Oh oh he is triggers the #maga people

In [ ]:
if (connection.is_connected()):
    connection.close()