# Notebook Author: 
### Brandon Salter 

In [None]:
# link to data: https://drive.google.com/file/d/1KErBeVoLADCa16ck7ReCMkfiRF4Ly_Lp/view?usp=sharing

# Dependencies

In [1]:
import json 
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
from tqdm import tqdm
lines = 101916*2

# Write Data to MongoDB and BigQuery

In [2]:
path = '/Users/brandonsalter/Documents/TwitterSearchApp_694_Team4_2024/project_key.json'
credentials = service_account.Credentials.from_service_account_file(path)

project_id = 'msds-417117'
client = bigquery.Client(credentials= credentials,project=project_id)

In [3]:
tweets_dataset = "msds-417117.Tweets"
tweets_table = "msds-417117.Tweets.Tweets"
retweets_dataset = "msds-417117.Retweets"
retweets_table = "msds-417117.Retweets.Retweets"

def create_dataset(dataset_id):
    # TODO(developer): Set dataset_id to the ID of the dataset to create.
    # dataset_id = "{}.your_dataset".format(client.project)

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # TODO(developer): Specify the geographic location where the dataset should reside.
    dataset.location = "US"

    # Send the dataset to the API for creation, with an explicit timeout.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

create_dataset(tweets_dataset)
create_dataset(retweets_dataset)

Created dataset msds-417117.Tweets
Created dataset msds-417117.Retweets


In [4]:
# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

tweet_schema = [
    bigquery.SchemaField("created_at", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("hashtags", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("id_str_tweet", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("text", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("favorite_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("lang", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("place", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("quote_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("reply_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("retweet_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("favourites_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("followers_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("friends_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("id_str_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("location_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("name_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("screen_name_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("statuses_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("verified_user", "STRING", mode="REQUIRED"),
]

table = bigquery.Table(tweets_table, schema=tweet_schema)
table = client.create_table(table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

Created table msds-417117.Tweets.Tweets


In [5]:
retweet_schema = [
    bigquery.SchemaField("created_at", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("hashtags", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("id_str_retweet", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("text", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("favorite_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("lang", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("place", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("quote_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("reply_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("retweet_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("favourites_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("followers_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("friends_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("id_str_retweet_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("location_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("name_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("screen_name_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("statuses_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("verified_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("id_str_tweet", "STRING", mode="REQUIRED"), # original tweet id
    bigquery.SchemaField("id_str_tweet_user", "STRING", mode="REQUIRED"), # original tweet user id
]

table = bigquery.Table(retweets_table, schema=retweet_schema)
table = client.create_table(table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

Created table msds-417117.Retweets.Retweets


In [6]:
retweets_df = pd.DataFrame(columns=["created_at", 
                    "hashtags",
                    "id_str_retweet", 
                    "text",
                    "favorite_count", 
                    "lang",
                    "place",
                    "quote_count", 
                    "reply_count", 
                    "retweet_count", 
                    "favourites_count_user", 
                    "followers_count_user", 
                    "friends_count_user", 
                    "id_str_retweet_user", 
                    "location_user", 
                    "name_user", 
                    "screen_name_user", 
                    "statuses_count_user", 
                    "verified_user", 
                    "id_str_tweet", 
                    "id_str_tweet_user"])
retweets_df

Unnamed: 0,created_at,hashtags,id_str_retweet,text,favorite_count,lang,place,quote_count,reply_count,retweet_count,...,followers_count_user,friends_count_user,id_str_retweet_user,location_user,name_user,screen_name_user,statuses_count_user,verified_user,id_str_tweet,id_str_tweet_user


In [7]:
retweets_df.columns

Index(['created_at', 'hashtags', 'id_str_retweet', 'text', 'favorite_count',
       'lang', 'place', 'quote_count', 'reply_count', 'retweet_count',
       'favourites_count_user', 'followers_count_user', 'friends_count_user',
       'id_str_retweet_user', 'location_user', 'name_user', 'screen_name_user',
       'statuses_count_user', 'verified_user', 'id_str_tweet',
       'id_str_tweet_user'],
      dtype='object')

In [8]:
tweets_df = pd.DataFrame(columns=["created_at", 
                    "hashtags",
                    "id_str_tweet", 
                    "text",
                    "favorite_count", 
                    "lang",
                    "place",
                    "quote_count", 
                    "reply_count", 
                    "retweet_count", 
                    "favourites_count_user", 
                    "followers_count_user", 
                    "friends_count_user", 
                    "id_str_user", 
                    "location_user", 
                    "name_user", 
                    "screen_name_user", 
                    "statuses_count_user", 
                    "verified_user"
                    ])
tweets_df

Unnamed: 0,created_at,hashtags,id_str_tweet,text,favorite_count,lang,place,quote_count,reply_count,retweet_count,favourites_count_user,followers_count_user,friends_count_user,id_str_user,location_user,name_user,screen_name_user,statuses_count_user,verified_user


In [9]:
tweets_df.columns

Index(['created_at', 'hashtags', 'id_str_tweet', 'text', 'favorite_count',
       'lang', 'place', 'quote_count', 'reply_count', 'retweet_count',
       'favourites_count_user', 'followers_count_user', 'friends_count_user',
       'id_str_user', 'location_user', 'name_user', 'screen_name_user',
       'statuses_count_user', 'verified_user'],
      dtype='object')

In [10]:
tweets = set()
users = set()
tweet_rows_to_insert = []
retweet_rows_to_insert = []


with open("corona-out-3.json", "r") as f1:
    for line in tqdm(f1, total=lines):
        try:
            data = json.loads(line)

            # Unprocessed tweet
            if data['id_str'] not in tweets:

                # Unprocessed Retweet
                if ( data['text'].startswith('RT') ):

                    # NOTE: streaming not allowed in BigQuery free tier
                    
                    #"stream" these keys to the Retweets table in BigQuery
                    retweets_df.loc[len(retweets_df)] = {"created_at": data['created_at'],
                                                        "hashtags": data['entities']['hashtags'],
                                                        "id_str_retweet": data['id_str'],
                                                        "text": data['text'],
                                                        "favorite_count": data['favorite_count'],
                                                        "lang": data['lang'],
                                                        "place": data['place'],
                                                        "quote_count": data['quote_count'],
                                                        "reply_count": data['reply_count'],
                                                        "retweet_count": data['retweet_count'],
                                                        "favourites_count_user": data['user']['favourites_count'],
                                                        "followers_count_user": data['user']['followers_count'],
                                                        "friends_count_user": data['user']['friends_count'],
                                                        "id_str_retweet_user": data['user']['id_str'],
                                                        "location_user": data['user']['location'],
                                                        "name_user": data['user']['name'],
                                                        "screen_name_user": data['user']['screen_name'],
                                                        "statuses_count_user": data['user']['statuses_count'],
                                                        "verified_user": data['user']['verified'],
                                                        "id_str_tweet": data['retweeted_status']['id_str'],
                                                        "id_str_tweet_user": data['retweeted_status']['user']['id_str']}
                    tweets.add(data['id_str'])
                
                # Unprocessed Tweet
                else:
                    # "stream" these keys to the Tweets table in BigQuery
                    tweets_df.loc[len(tweets_df)] = {"created_at": data['created_at'],
                                                    "hashtags": data['entities']['hashtags'],
                                                    "id_str_tweet": data['id_str'],
                                                    "text": data['text'],
                                                    "favorite_count": data['favorite_count'],
                                                    "lang": data['lang'],
                                                    "place": data['place'],
                                                    "quote_count": data['quote_count'],
                                                    "reply_count": data['reply_count'],
                                                    "retweet_count": data['retweet_count'],
                                                    "favourites_count_user": data['user']['favourites_count'],
                                                    "followers_count_user": data['user']['followers_count'],
                                                    "friends_count_user": data['user']['friends_count'],
                                                    "id_str_user": data['user']['id_str'],
                                                    "location_user": data['user']['location'],
                                                    "name_user": data['user']['name'],
                                                    "screen_name_user": data['user']['screen_name'],
                                                    "statuses_count_user": data['user']['statuses_count'],
                                                    "verified_user": data['user']['verified']}     
                    tweets.add(data['id_str'])
                                   
            #user = data['user']
            # psuedocode: 
            # if user has not been seen before
            #            add to user table 
            # update datastores with any metrics that you are tracking 
            
        except:
            continue

print('tweet rows:', len(tweets_df))
print('retweet rows:', len(retweets_df)) 

100%|██████████| 203832/203832 [18:17<00:00, 185.75it/s]

tweet rows: 40793
retweet rows: 61090





In [11]:
retweets_df.to_csv('retweets.csv')
tweets_df.to_csv('tweets.csv')

In [12]:
retweets_df.head()

Unnamed: 0,created_at,hashtags,id_str_retweet,text,favorite_count,lang,place,quote_count,reply_count,retweet_count,...,followers_count_user,friends_count_user,id_str_retweet_user,location_user,name_user,screen_name_user,statuses_count_user,verified_user,id_str_tweet,id_str_tweet_user
0,Sat Apr 25 12:21:42 +0000 2020,[],1254022772558368768,RT @BJP4India: India’s war with Corona is ongo...,0,en,,0,0,0,...,2362,202,908326492718764034,Amethi Uttar Pradesh,शचीन्द्र पाण्डेय,im_S_pandey,48906,False,1253949413191344128,207809313
1,Sat Apr 25 12:21:42 +0000 2020,[],1254022772877131777,RT @schrodingerk42: @ozkan_yalim @DurmusYillma...,0,tr,,0,0,0,...,318,220,1206650133976408064,,Büşra Öztaş,schrodingerk42,405,False,1252576316135739392,1206650133976408064
2,Sat Apr 25 12:21:42 +0000 2020,[],1254022773149589510,RT @MonstaXEurope: VIDEO | 25.04.20\n\n&gt; Mo...,0,en,,0,0,0,...,1194,538,1248123252,,minhyuk.,mizhyuklee,157680,False,1253992905703862272,858859031464751104
3,Sat Apr 25 12:21:42 +0000 2020,[],1254022773858545665,RT @gustinicchi: Morti COL Corona non PER il C...,0,it,,0,0,0,...,1165,1840,50993809,,🇮🇹Henry Whites♥️,Enrico_Bianchi,42671,False,1254010851142569984,761787475
4,Sat Apr 25 12:21:42 +0000 2020,[],1254022774521081856,RT @PawanKalyan: Sri ByReddy Rajasekhar Reddy ...,0,en,,0,0,0,...,221,311,792325679354417152,"Chirala, India",Balu Pspk,Balu54368353,28240,False,1253697983343816706,2719753171


In [13]:
tweets_df.head()

Unnamed: 0,created_at,hashtags,id_str_tweet,text,favorite_count,lang,place,quote_count,reply_count,retweet_count,favourites_count_user,followers_count_user,friends_count_user,id_str_user,location_user,name_user,screen_name_user,statuses_count_user,verified_user
0,Sat Apr 25 12:21:41 +0000 2020,[],1254022770679320576,"É isto, ou vou morrer sem ar ou com o corona h...",0,pt,,0,0,0,5446,89,173,804046791348015107,"Acre, Brasil",Bi Sex Uau,B_King69,4728,False
1,Sat Apr 25 12:21:41 +0000 2020,"[{'text': 'sport', 'indices': [32, 38]}, {'tex...",1254022770746372096,Schöne Runde mit dem Rennrad ✌️\n#sport #coron...,0,de,"{'id': '6c741a421f6c33ff', 'url': 'https://api...",0,0,0,2184,173,685,2242948745,,Thomas Krause,tho1965,1865,False
2,Sat Apr 25 12:21:42 +0000 2020,[],1254022772575043586,Was sollen 150 Euro Computerzuschuss bringen? ...,0,de,,0,0,0,32024,778,733,2929344220,🇩4790 Provinz,Ralf Schmitz,RusticusArat,30551,False
3,Sat Apr 25 12:21:42 +0000 2020,[],1254022773598572544,@VinceMcMahon @TripleH We hereby honor to anno...,0,en,,0,0,0,352,2294,6709,1091660129894838272,,Milli teknoloji,milliteknoloj,393,False
4,Sat Apr 25 12:21:43 +0000 2020,[],1254022776094105602,im making 17 a hr doing nothing 😭😭😭😭 i love th...,0,en,,0,0,0,1325,637,408,375777294,,TeéLaneeë🌺,TWD40_,18976,False


In [17]:
arr = retweets_df['hashtags'].values
lst = []

for item in arr:
    if item == []:
        lst.append('none')
    else:
        lst.append(item[0]['text'])

retweets_df['hashtags'] = lst
retweets_df.head()

Unnamed: 0,created_at,hashtags,id_str_retweet,text,favorite_count,lang,place,quote_count,reply_count,retweet_count,...,followers_count_user,friends_count_user,id_str_retweet_user,location_user,name_user,screen_name_user,statuses_count_user,verified_user,id_str_tweet,id_str_tweet_user
0,Sat Apr 25 12:21:42 +0000 2020,none,1254022772558368768,RT @BJP4India: India’s war with Corona is ongo...,0,en,,0,0,0,...,2362,202,908326492718764034,Amethi Uttar Pradesh,शचीन्द्र पाण्डेय,im_S_pandey,48906,False,1253949413191344128,207809313
1,Sat Apr 25 12:21:42 +0000 2020,none,1254022772877131777,RT @schrodingerk42: @ozkan_yalim @DurmusYillma...,0,tr,,0,0,0,...,318,220,1206650133976408064,,Büşra Öztaş,schrodingerk42,405,False,1252576316135739392,1206650133976408064
2,Sat Apr 25 12:21:42 +0000 2020,none,1254022773149589510,RT @MonstaXEurope: VIDEO | 25.04.20\n\n&gt; Mo...,0,en,,0,0,0,...,1194,538,1248123252,,minhyuk.,mizhyuklee,157680,False,1253992905703862272,858859031464751104
3,Sat Apr 25 12:21:42 +0000 2020,none,1254022773858545665,RT @gustinicchi: Morti COL Corona non PER il C...,0,it,,0,0,0,...,1165,1840,50993809,,🇮🇹Henry Whites♥️,Enrico_Bianchi,42671,False,1254010851142569984,761787475
4,Sat Apr 25 12:21:42 +0000 2020,none,1254022774521081856,RT @PawanKalyan: Sri ByReddy Rajasekhar Reddy ...,0,en,,0,0,0,...,221,311,792325679354417152,"Chirala, India",Balu Pspk,Balu54368353,28240,False,1253697983343816706,2719753171


In [18]:
arr = tweets_df['hashtags'].values
lst = []

for item in arr:
    if item == []:
        lst.append('none')
    else:
        lst.append(item[0]['text'])

tweets_df['hashtags'] = lst
tweets_df.head()

Unnamed: 0,created_at,hashtags,id_str_tweet,text,favorite_count,lang,place,quote_count,reply_count,retweet_count,favourites_count_user,followers_count_user,friends_count_user,id_str_user,location_user,name_user,screen_name_user,statuses_count_user,verified_user
0,Sat Apr 25 12:21:41 +0000 2020,none,1254022770679320576,"É isto, ou vou morrer sem ar ou com o corona h...",0,pt,,0,0,0,5446,89,173,804046791348015107,"Acre, Brasil",Bi Sex Uau,B_King69,4728,False
1,Sat Apr 25 12:21:41 +0000 2020,sport,1254022770746372096,Schöne Runde mit dem Rennrad ✌️\n#sport #coron...,0,de,"{'id': '6c741a421f6c33ff', 'url': 'https://api...",0,0,0,2184,173,685,2242948745,,Thomas Krause,tho1965,1865,False
2,Sat Apr 25 12:21:42 +0000 2020,none,1254022772575043586,Was sollen 150 Euro Computerzuschuss bringen? ...,0,de,,0,0,0,32024,778,733,2929344220,🇩4790 Provinz,Ralf Schmitz,RusticusArat,30551,False
3,Sat Apr 25 12:21:42 +0000 2020,none,1254022773598572544,@VinceMcMahon @TripleH We hereby honor to anno...,0,en,,0,0,0,352,2294,6709,1091660129894838272,,Milli teknoloji,milliteknoloj,393,False
4,Sat Apr 25 12:21:43 +0000 2020,none,1254022776094105602,im making 17 a hr doing nothing 😭😭😭😭 i love th...,0,en,,0,0,0,1325,637,408,375777294,,TeéLaneeë🌺,TWD40_,18976,False


In [23]:
retweets_df['verified_user'] = retweets_df['verified_user'].astype(str)

In [25]:
retweets_df = retweets_df.fillna('none')

In [27]:
tweets_df['verified_user'] = tweets_df['verified_user'].astype(str)
tweets_df = tweets_df.fillna('none')

In [29]:
tweets_df['place']

0                                                     none
1        {'id': '6c741a421f6c33ff', 'url': 'https://api...
2                                                     none
3                                                     none
4                                                     none
                               ...                        
40788                                                 none
40789                                                 none
40790                                                 none
40791                                                 none
40792                                                 none
Name: place, Length: 40793, dtype: object

In [30]:
tweets_df['place'] = tweets_df['place'].astype(str)

In [19]:
# optional: you can pass in a job_config with a schema if you want to define
# a specific schema
retweet_job_config = bigquery.LoadJobConfig(schema=retweet_schema)
tweet_job_config = bigquery.LoadJobConfig(schema=tweet_schema)

In [26]:
load_job = client.load_table_from_dataframe(
    retweets_df, retweets_table,
    job_config=retweet_job_config
)  # this will execute the load job

result = load_job.result()
result.done() # prints True if done

True

In [31]:
load_job = client.load_table_from_dataframe(
    tweets_df, tweets_table,
    job_config=tweet_job_config
)  # this will execute the load job

result = load_job.result()
result.done() # prints True if done

True