# Libraries

In [108]:
import pandas as pd
from collections import defaultdict
from datetime import datetime
from dateutil.relativedelta import relativedelta
import time
import os
import logging
import json
import csv

In [101]:
logging.basicConfig(filename='App_Stream_ServerStatus.log', encoding='utf-8', level=logging.ERROR, force=True)

In [102]:
pd.options.display.max_columns = None

# Load Crentials

In [4]:
from dotenv import dotenv_values

config = dotenv_values(".env")
CONSUMER_KEY = config['CONSUMER_KEY']
CONSUMER_SECRET = config['CONSUMER_SECRET']
BEARER_TOKEN = config['BEARER_TOKEN']

# Stream API

In [5]:
import tweepy
import sys

In [9]:
# StreamListener class inherits from tweepy.StreamListener and overrides on_status/on_error methods.
class Stream(tweepy.Stream):
    def on_status(self, status):
        print(status.id_str)
        # if "retweeted_status" attribute exists, flag this tweet as a retweet.
        is_retweet = hasattr(status, "retweeted_status")

        # check if text has been truncated
        if hasattr(status,"extended_tweet"):
            text = status.extended_tweet["full_text"]
        else:
            text = status.text

        # check if this is a quote tweet.
        is_quote = hasattr(status, "quoted_status")
        quoted_text = ""
        if is_quote:
            # check if quoted tweet's text has been truncated before recording it
            if hasattr(status.quoted_status,"extended_tweet"):
                quoted_text = status.quoted_status.extended_tweet["full_text"]
            else:
                quoted_text = status.quoted_status.text

        # remove characters that might cause problems with csv encoding
        remove_characters = [",","\n"]
        for c in remove_characters:
            text.replace(c," ")
            quoted_text.replace(c, " ")

        with open("out.csv", "a", encoding='utf-8') as f:
            f.write("%s,%s,%s,%s,%s,%s\n" % (status.created_at,status.user.screen_name,is_retweet,is_quote,text,quoted_text))

    def on_error(self, status_code):
        print("Encountered streaming error (", status_code, ")")
        sys.exit()

In [13]:
streaming_client = tweepy.StreamingClient(BEARER_TOKEN)
streaming_client.add_rules(tweepy.StreamRule("Tweepy"))

Response(data=[StreamRule(value='Tweepy', tag=None, id='1588313169058877440')], includes={}, errors=[], meta={'sent': '2022-11-03T23:32:23.632Z', 'summary': {'created': 1, 'not_created': 0, 'valid': 1, 'invalid': 0}})

In [6]:
def get_list_of_items(list_of_dict: list, key_name: str) -> list:
    if not list_of_dict: # If list_of_dict is None
        return []
    return [entity[key_name] for entity in list_of_dict]

In [7]:
def parse_entities(tweet_entities: dict, object_dict: dict, prefix: str = '') -> dict:
    entities_dict = defaultdict(list)

    # Retrieve Entities Objects
    for object_name, key_name in object_dict.items():
        column_name = f"{prefix}{object_name}_list"
        # print(object_name)
        entities_dict[column_name] = get_list_of_items(tweet_entities.get(object_name), key_name)
    return entities_dict
    # entities_dict['hashtags_list'] = get_list_of_items(tweet_entities['hashtags'], 'tag')
    # entities_dict['urls_list'] = get_list_of_items(tweet_entities['urls'], 'expanded_url')

In [8]:
def expand_dict_object(object_dict: dict, column_prefix: str,key_names_list: list = []) -> dict:
    """Return key,value pairs from dict. Returns selected keys in key_names_list or all if key_names_list is []

    Args:
        object_dict (dict)
        key_names_list (list)
    Returns:
        dict
    """
    result_dict = defaultdict(list)
    for key_name, value_name in object_dict.items():
        column_name = f"{column_prefix}_{key_name}"
        if not key_names_list:
            result_dict[column_name] = value_name
        elif key_name in key_names_list:
            result_dict[column_name] = value_name
    return result_dict

In [105]:
json_tweets_columns = [
    'id', 'author_id', 'possibly_sensitive', 'edit_history_tweet_ids', 'lang',
    'source', 'reply_settings', 'text', 'created_at'
]

json_users_columns = [
    'id', 'name', 'username', 'location', 'url', 'created_at', 'username',
    'profile_image_url', 'profile_image_url', 'verified', 'description',
    'protected'
]

In [113]:
tweet_columns = [
    'possibly_sensitive', 'text', 'source', 'id', 'created_at', 'lang', 'reply_settings', 'author_id', 'edit_history_tweet_ids', 
    'hashtags_list', 'urls_list', 'public_metrics_retweet_count', 'public_metrics_reply_count', 'public_metrics_like_count', 
    'public_metrics_quote_count', 'pagination_token', 'current_time'
]

user_columns = [
    'profile_image_url', 'username', 'protected', 'name', 'id', 'description', 
    'created_at', 'verified', 'location', 'url_urls_list', 'description_hashtags_list', 'description_urls_list', 'description_mentions_list',
    'description_cashtags_list', 'public_metrics_followers_count', 'public_metrics_following_count', 'public_metrics_tweet_count', 
    'public_metrics_listed_count', 'current_time'
]

In [114]:
class StreamingClient(tweepy.StreamingClient):

    # def on_tweet(self, tweet):
    #     # print(status.id_str)
    #     # # if "retweeted_status" attribute exists, flag this tweet as a retweet.
    #     # is_retweet = hasattr(status, "retweeted_status")

    #     # # check if text has been truncated
    #     # if hasattr(status,"extended_tweet"):
    #     #     text = status.extended_tweet["full_text"]
    #     # else:
    #     #     text = status.text

    #     # # check if this is a quote tweet.
    #     # is_quote = hasattr(status, "quoted_status")
    #     # quoted_text = ""
    #     # if is_quote:
    #     #     # check if quoted tweet's text has been truncated before recording it
    #     #     if hasattr(status.quoted_status,"extended_tweet"):
    #     #         quoted_text = status.quoted_status.extended_tweet["full_text"]
    #     #     else:
    #     #         quoted_text = status.quoted_status.text

    #     # # remove characters that might cause problems with csv encoding
    #     # remove_characters = [",","\n"]
    #     # for c in remove_characters:
    #     #     text.replace(c," ")
    #     #     quoted_text.replace(c, " ")
    #     print(tweet)
    #     with open("out.csv", "a", encoding='utf-8') as f:
    #         f.write("%s\n" % (tweet.id))
    def on_data(self ,raw_data):
        tweets_list_for_dataframe = []
        users_list_for_dataframe = []
        # print(raw_data)
        output_tweets = json.loads(raw_data)
        # If no data found:
        if output_tweets.get('data'):
            logging.warning(f'No data found for userId')
            # continue
            # hast_next_token = False
            # return
            tweet = output_tweets['data']
            # Get new columns
            if not tweet.get('entities'):
                entities_dict = parse_entities({}, {'hashtags': 'tag', 'urls': 'expanded_url'})
            else:
                entities_dict = parse_entities(tweet['entities'], {'hashtags': 'tag', 'urls': 'expanded_url'})

            if not tweet.get('public_metrics'):
                public_metrics_dict = expand_dict_object({}, 'public_metrics')
            else:
                public_metrics_dict = expand_dict_object(tweet['public_metrics'], 'public_metrics')

            # Filter out unwanted columns
            tweet = {key: tweet[key] for key in tweet.keys() if key in json_tweets_columns}

                            # Combine dicts
            tweet = {**tweet, **entities_dict, **public_metrics_dict, 'current_time': datetime.now()}
            with open(f"data/tweets-stream.csv", "a+", encoding='utf-8') as f:
                tweet_csv = [ tweet.get(col) for col in tweet_columns]
                write = csv.writer(f)
                write.writerows([tweet_csv])

            for user in output_tweets['includes']['users']:
                # Get new columns
                if (not user.get('entities')) or (not user.get('entities').get('url')):
                    url_dict = parse_entities({}, {'urls': 'expanded_url'}, prefix='url_')
                else:
                    url_dict = parse_entities(user['entities']['url'], {'urls': 'expanded_url'}, prefix='url_')

                if (not user.get('entities')) or (not user.get('entities').get('description')):
                    description_dict = parse_entities(
                        {},
                        {'hashtags': 'tag', 'urls': 'expanded_url', 'mentions': 'username', 'cashtags': 'tag'}, prefix='description_')
                else:
                    description_dict = parse_entities(
                        user['entities']['description'],
                        {'hashtags': 'tag', 'urls': 'expanded_url', 'mentions': 'username', 'cashtags': 'tag'}, prefix='description_')

                user_public_metrics_dict = expand_dict_object(user['public_metrics'], 'public_metrics')
                # Filter out unwanted columns
                user = {key: user[key] for key in user.keys() if key in json_users_columns}

                # Combine dicts
                user = {**user, **url_dict, **description_dict, **user_public_metrics_dict, 'current_time': datetime.now()}
                user_csv = [ user.get(col) for col in user_columns]
                users_list_for_dataframe.append(user_csv)

            if users_list_for_dataframe:
                with open(f"data/users-stream.csv", "a+", encoding='utf-8') as f:
                    write = csv.writer(f)
                    write.writerows(users_list_for_dataframe)

    def on_exception(self, exception):
        logging.warning(exception)
        print('On Exception')
    def on_errors(self, status_code):
        print("Encountered streaming error (", status_code, ")")
        # sys.exit()

In [57]:
# streaming_client.delete_rules(['1588313169058877440', '1578971194384011265', '1578971194384011264'])

In [58]:
query_params = {
    # 'query': '(from:twitterdev) has:links has:hashtags lang:en',
    'user_fields': ['created_at,description,entities,id,location,name,pinned_tweet_id,profile_image_url,protected,public_metrics,url,username,verified,withheld'],
    'tweet.fields': 'created_at,public_metrics,entities,lang,possibly_sensitive,reply_settings,source,in_reply_to_user_id,geo',
    'expansions': 'author_id',
    'start_time': START_TIME,
    'end_time': END_TIME,
    'max_results': 500
}

NameError: name 'START_TIME' is not defined

In [128]:
user_filters = "-is:retweet has:geo (from:NWSNHC OR from:NHC_Atlantic OR from:NWSHouston OR from:NWSSanAntonio OR from:USGS_TexasRain OR from:USGS_TexasFlood OR from:JeffLindner1)"

In [122]:
streaming_client.get_rules()

Response(data=[StreamRule(value='(from:1156163843820838916 OR from:232423423 OR from:3298472387428374)', tag=None, id='1590895857376714753')], includes={}, errors=[], meta={'sent': '2022-11-11T02:37:30.111Z', 'result_count': 1})

In [131]:
tweets_list_for_dataframe = []
users_list_for_dataframe = []
streaming_client = StreamingClient(BEARER_TOKEN)
streaming_client.add_rules(tweepy.StreamRule(user_filters))
# Create CSV file

if not os.path.exists(f"data/tweets-stream.csv"):
    with open("data/tweets-stream.csv", "a+", encoding='utf-8') as f:
        write = csv.writer(f)
        write.writerows([tweet_columns])

if not os.path.exists(f"data/users-stream.csv"):
    with open("data/users-stream.csv", "a+", encoding='utf-8') as f:
        write = csv.writer(f)
        write.writerows([user_columns])

# tags = ["hate speech"]
streaming_client.sample(expansions='author_id', tweet_fields='created_at,public_metrics,entities,lang,possibly_sensitive,reply_settings,source,in_reply_to_user_id,geo', user_fields='created_at,description,entities,id,location,name,pinned_tweet_id,profile_image_url,protected,public_metrics,url,username,verified,withheld')

KeyboardInterrupt: 

In [130]:
if __name__ == "__main__":
    # complete authorization and initialize API endpoint
    # auth = tweepy.OAuth2BearerHandler(BEARER_TOKEN)
    auth = tweepy.Client(BEARER_TOKEN)
    api = tweepy.API(auth)

    # initialize stream
    streamListener = Stream()
    stream = tweepy.Stream(auth=api.auth, listener=streamListener,tweet_mode='extended')
    with open("out.csv", "w", encoding='utf-8') as f:
        f.write("date,user,is_retweet,is_quote,text,quoted_text\n")
    tags = ["hate speech"]
    stream.filter(track=tags)

NameError: name 'Stream' is not defined

In [136]:
df = pd.read_csv('data/tweets-stream.csv')

In [137]:
df.head()

Unnamed: 0,possibly_sensitive,text,source,id,created_at,lang,reply_settings,author_id,edit_history_tweet_ids,hashtags_list,urls_list,public_metrics_retweet_count,public_metrics_reply_count,public_metrics_like_count,public_metrics_quote_count,pagination_token,current_time
0,False,"RT @weedside0: hija de papi? si, por????",Twitter for iPad,1590904766871502848,2022-11-11T03:10:28.000Z,es,everyone,1283839616810995712,['1590904766871502848'],[],[],2117,0,0,0,,2022-11-10 19:10:33.057126
1,False,RT @YukiSacura: しぐさとか声とか目線とか‥\n\nわぁーー\nかわいすぎる\...,Twitter Web App,1590904766888284161,2022-11-11T03:10:28.000Z,ja,everyone,1521270807460478976,['1590904766888284161'],['AMAs'],[],88,0,0,0,,2022-11-10 19:10:33.058297
2,False,RT @AleB1808: Ni así la gente quiere volar de ...,Twitter for Android,1590904766867161090,2022-11-11T03:10:28.000Z,es,everyone,974578403692109824,['1590904766867161090'],[],['https://twitter.com/AleB1808/status/15908799...,68,0,0,0,,2022-11-10 19:10:33.060218
3,False,It's about damn time! Sanctioning lawyers shou...,Twitter for Android,1590904766866984960,2022-11-11T03:10:28.000Z,en,everyone,975364111,['1590904766866984960'],['SmartNews'],['https://l.smartnews.com/oqzhk/MAVqeR'],0,0,0,0,,2022-11-10 19:10:33.060988
4,False,MBB: 2nd Half | 18:57\nThe Saints start off th...,Twitter for Android,1590904766875553793,2022-11-11T03:10:28.000Z,en,everyone,56516160,['1590904766875553793'],"['SCCCSaints', 'GoodToBeGreen']",[],0,0,0,0,,2022-11-10 19:10:33.061604
