# Connect to API

from https://www.analyticsvidhya.com/blog/2020/08/analysing-streaming-tweets-with-python-and-postgresql/

In [11]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tweepy
import json 
import datetime
import time
import seaborn as sns

import psycopg2
%load_ext autoreload
%autoreload 2

In [5]:
from auth_pg import *
from auth_ap import *

In [6]:
# Creating the authentication object
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
# Setting your access token and secret
auth.set_access_token(access_token, access_token_secret)
# Creating the API object while passing in auth information
api = tweepy.API(auth) 

In [7]:
# Using the API object to get tweets from your timeline, and storing it in a variable called public_tweets
public_tweets = api.home_timeline()
# foreach through all tweets pulled
for tweet in public_tweets:
   # printing the text stored inside the tweet object
   print (tweet.text)

What's your Tilted Taxis strategy? 🚕
The United States of America will NOT be cutting funding to @starsandstripes magazine under my watch. It will conti… https://t.co/r4D5itmn1Y
Before you die you will on average walk 110,000 miles and the earth is 25,000 miles at the equator. So you could ha… https://t.co/GqEOtNZMWj
THANK YOU! #MAGA https://t.co/1vMZMmm6KZ
We didn’t just bring back the pretzel bun, we brought you the Pretzel Bacon Pub Cheeseburger. And now Postmates wil… https://t.co/Ka1DVyAELq
A GREAT HONOR, THANK YOU! https://t.co/SYM0wk5cqn
https://t.co/bA09LqxT15
https://t.co/Pk5mnHHll6
Another great day for peace with Middle East – Muslim-majority Kosovo and Israel have agreed to normalize ties and… https://t.co/cm0JuEHhV2
Congratulations to @predsednikrs Vucic for announcing the move of Serbia’s Embassy to Jerusalem by July. It is a brave and historic move!
https://t.co/EtEMZ4rYXs
...to bringing prosperity and peace to the Balkans and the world. I am proud to have assisted these

In [68]:
class MyStreamListener(tweepy.StreamListener):
    
    def __init__(self, time_limit= 30):
        self.start_time = time.time()
        self.limit = time_limit
        super(MyStreamListener, self).__init__()
    
    def on_connect(self):
        print("Connected to Twitter API.")
        
    def on_status(self, status):
        
        # User info
        user_id = status.user.id
        username = status.user.name
        followers_count = status.user.followers_count
        friends_count = status.user.friends_count
        
        user_info = (user_id, username, followers_count, friends_count)
        
        # tweet info
        tweet_id = status.id
        created_at = status.created_at
        favorite_count = status.favorite_count
        source_device = status.source
         
        # Retweet count
        retweet_count = status.retweet_count
        # Language
        lang = status.lang
        
        # Tweet
        if status.truncated == True:
            tweet = status.extended_tweet['full_text']
            hashtags = status.extended_tweet['entities']['hashtags']
        else:
            tweet = status.text
            hashtags = status.entities['hashtags']
        
        tweet_info = (tweet_id, tweet, retweet_count, favorite_count, source_device, created_at)
        # Read hastags
        hashtags = read_hashtags(hashtags)   

        # If tweet is not a retweet and tweet is in English
        if not hasattr(status, "retweeted_status") and lang=="en":
            # Connect to database
            dbConnect(user_info, tweet_info, hashtags)
            
        if (time.time() - self.start_time) > self.limit:
            
            print(time.time(), self.start_time, self.limit)
            return False
            
    def on_error(self, status_code):
        if status_code == 420:
            # Returning False in on_data disconnects the stream
            return False

In [74]:
# Insert Tweet data into database
def dbConnect(user_info, tweet_info, hashtags):
    user_id, username, followers_count, friends_count = user_info
    tweet_id, tweet, retweet_count, favorite_count, source_device, created_at = tweet_info
    
    database = 'twitterDB'
    # Connection to database server
    conn = psycopg2.connect(host= host,
                        database= database,
                        port= port,
                        user= user,
                        password= password)
    
    cur = conn.cursor()

    # insert user information
    command = '''INSERT INTO TwitterUser (user_id, user_name, followers_count, friends_count) 
                VALUES (%s,%s,%s,%s) ON CONFLICT
                 (User_Id) DO NOTHING;'''
    cur.execute(command,(user_id, username, followers_count, friends_count))

    # insert tweet information
    command = '''INSERT INTO TwitterTweet (tweet_id, user_id, tweet, retweet_count, favourite_count, source_device, created_at) 
                VALUES (%s,%s,%s,%s,%s,%s,%s);'''
    cur.execute(command,(tweet_id, user_id, tweet, retweet_count, favorite_count, source_device, created_at))
    
    # insert entity information
    for i in range(len(hashtags)):
        hashtag = hashtags[i]
        command = '''INSERT INTO TwitterEntity (tweet_id, hashtag) VALUES (%s,%s);'''
        cur.execute(command,(tweet_id, hashtag))
    
    # Commit changes
    conn.commit()
    
    # Disconnect
    cur.close()
    conn.close()

In [63]:
# Extract hashtags
def read_hashtags(tag_list):
    hashtags = []
    for tag in tag_list:
        hashtags.append(tag['text'])
    return hashtags

In [77]:
# Connecting to the Database
def DbConnect(query):
    
    conn = psycopg2.connect(host= host,
                        database= database,
                        port= port,
                        user= user,
                        password= password)
    curr = conn.cursor()
    
    curr.execute(query)
    
    rows = curr.fetchall()
    curr.close()
    conn.close()
    return rows

## Create table

In [53]:
# Table creation
commands = (# Table 1
            '''Create Table TwitterUser(User_Id BIGINT PRIMARY KEY, 
                                        User_Name TEXT,
                                        Description TEXT,
                                        Followers_Count INT,
                                        Friends_Count INT
                                        );''',
            # Table 2
            '''Create Table TwitterTweet(Tweet_Id BIGINT PRIMARY KEY,
                                         User_Id BIGINT,
                                         Tweet TEXT,
                                         Retweet_Count INT,
                                         Favourite_Count INT,
                                         Source_Device TEXT,
                                         Created_At TEXT,
                                         CONSTRAINT fk_user
                                             FOREIGN KEY(User_Id)
                                                 REFERENCES TwitterUser(User_Id));''',
            # Table 3
            '''Create Table TwitterEntity(Id SERIAL PRIMARY KEY,
                                         Tweet_Id BIGINT,
                                         Hashtag TEXT,
                                         CONSTRAINT fk_user
                                             FOREIGN KEY(Tweet_Id)
                                                 REFERENCES TwitterTweet(Tweet_Id));''')

In [56]:
# Connection to database server
database = 'twitterDB'
conn = psycopg2.connect(host= host,
                        database= database,
                        port= port,
                        user= user,
                        password= password)

In [57]:
# Create cursor to execute SQL commands
cur = conn.cursor()

# Execute SQL commands
for command in commands:
    # Create tables
    cur.execute(command)

# Close communication with server
conn.commit()
cur.close()
conn.close()

## Obtain tweets

In [75]:
# Streaming tweets
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener,
                        tweet_mode="extended")

myStream.filter(track=['#covid'])

Connected to Twitter API.
1599262779.4033823 1599262748.0481124 30


In [38]:
# Create 
data_tweet = DbConnect("SELECT User_Id, Tweet_Id, Tweet FROM TwitterTweet;")

df_tweet = pd.DataFrame(columns=['User_Id','Tweet_Id','Clean_Tweet'])

for data in data_tweet:
    index = len(df_tweet)
    df_tweet.loc[index,'User_Id'] = data[0]
    df_tweet.loc[index,'Tweet_Id'] = data[1]
    df_tweet.loc[index,'Clean_Tweet'] = data[2]
    
df_tweet.head()

Unnamed: 0,User_Id,Tweet_Id,Clean_Tweet
0,3223007252,1301998394773303296,The Franchise is the first player to use the C...
1,735671673064542209,1301998395486277635,@vasilionaire Maybe a month for covid?
2,1016711616296050690,1301998396316712962,"@PeterAr18228396 And when, hopefully, many peo..."
3,31062480,1301998399550623745,@ASlavitt My sis and brother in law both have ...
4,18635333,1301998399663886336,From Purdue....I didn't look at the numbers so...


In [78]:
data_tweet = DbConnect("SELECT * FROM TwitterUser;")

In [79]:
data_tweet

[(537818953, 'Info Musket - News and Information', 10718, 8860),
 (1247195529710166016,
  "💯🌊You can't fix STUPID, but you can VOTE it out!",
  3498,
  4718),
 (913049358882373632, 'Innovative Scott', 2416, 4697),
 (168425731, 'Agenparl', 6066, 2291),
 (822897086614093828, 'VeganLover28', 328, 29)]

In [80]:
conn.close()