# Parse Twitter Stream

This notebook is to parse the twitter stream files that were downloaded from archive.org, find the tweets related to finance, and create a new file with just those tweets. First the function process_tar_file() is used to extract files from the .tar files that were downloaded, and a folder structure is automatically created from the resulting extract. Next, the process_json() function processes these files, extracting only English language tweets, and saves each day of tweets in a .csv file for easy loading later. Next, a regex filter is used to find only those tweets with a cashtag. Finally, all the filtered tweets are combined and saved into one file.

Many of the files were processed individually or in small groups, due to memory limitations on my local machine.

In [2]:
import numpy as np
import pandas as pd
import json, os, bz2
from IPython.display import clear_output
import datetime
from datetime import datetime
import tarfile
import re
import os
import glob

In [3]:
def process_tar_file(tar_file, extract_folder='.'):
    """
    Parses the tar file, obtained from the Twitter Stream Grab from Archive.org, into a structured folder set
    with .json.bz2 files for each minute of each hour.
    """
    begin_time = datetime.now()
    my_tar = tarfile.open(tar_file)
    my_tar.extractall(extract_folder)
    my_tar.close()
    end_time = datetime.now()
    print('Processing complete! Total time:', end_time - begin_time)

In [13]:
def process_json(directory, outfile):
    
    """ 
    Parses Twitter archives from Archive Team: The Twitter Stream Grab. Tar files must be processed prior to
    running this function. Only tweets in the english language are returned. Processed json files are saved to a 
    csv file, with the name of the outfile parameter and a DataFrame object is also returned with the data
    immediately available for use.
    
    Parameters:
    
    directory: name of the top-level file where the .json.bz2 files were extracted to (from the .tar file)
    
    outfile: name of the .csv file that the data will be saved to. It is not necessary to add .csv to this.
    """
    
    # Initializing variables
    begin_time = datetime.now()
    cursor='>>  '
    count_tweets=0
    columns=['tweet_date','tweet_id','tweet_text','lang','retweet','tweet_url', 'tweet_reply', 'hashes', 'user_name',
             'screen_name', 'user_verified', 'user_lang']
    tweet_dict = {}
    i = 0 
    
    for dirpath, dirnames, filenames in os.walk(directory):
        # Screen prints
        clear_output()
        print(cursor,'tweets:',count_tweets)
        print('-'*10)
        print(cursor,'currently searching', dirpath)

        for file in filenames:
            if file.endswith('.bz2'):
                # Extract bz2 archives to memory
                file = bz2.BZ2File(os.path.join(dirpath, file), mode='r')
                for line in file:
                    tweet = json.loads(line)
                    # try/except block in necessary since not all lines are tweets
                    try: 
                        # only retrieve tweets in english
                        if tweet['lang']=='en':
                            
                            # create the fields to be saved
                            if tweet['created_at']:
                                tweet_date=tweet['created_at']
                            else:
                                tweet_date=False

                            if tweet['id']:
                                tweet_id=tweet['id']
                            else:
                                tweet_id=False

                            if tweet['text']:
                                tweet_text=tweet['text']
                            else:
                                tweet_text=False

                            if tweet['lang']:
                                lang = tweet['lang']
                            else:
                                lang=False

                            if tweet['retweeted']:
                                retweet = tweet['retweeted']
                            else:
                                retweet=False

                            if tweet['entities']['urls']:
                                tweet_url = True
                            else: 
                                tweet_url = False

                            if tweet['in_reply_to_screen_name']:
                                tweet_reply = True
                            else:
                                tweet_reply = False


                            hashes = tweet['entities']['hashtags']
                            user_name = tweet['user']['name']
                            screen_name = tweet['user']['screen_name']
                            user_verified = tweet['user']['verified']
                            user_lang = tweet['user']['lang']

                            # use a dictionary for faster processing
                            tweet_dict[i] = [tweet_date,tweet_id,tweet_text,lang,retweet,tweet_url, tweet_reply, 
                                             hashes, user_name, screen_name, user_verified, user_lang]
                            i += 1
                            count_tweets += 1
                        
                    except KeyError:
                        continue
           
    # save df and print processing time
    df = pd.DataFrame.from_dict(tweet_dict, orient='index', columns=columns)
    df.to_csv(outfile + '.csv')
    end_time = datetime.now()
    print('Total time to run:', end_time-begin_time)
    print('Shape of DataFrame:', df.shape)
    return df                        

In [7]:
process_tar_file('tar_files/twitter-2018-07-31.tar')

Processing complete! Total time: 0:00:09.392108


In [8]:
tar_files_list = ['twitter-2018-07-02.tar', 'twitter-2018-07-03(1).tar', 'twitter-2018-07-11.tar', 
                  'twitter-2018-07-12.tar', 'twitter-2018-07-13.tar', 'twitter-2018-07-14.tar',
                  'twitter-2018-07-15.tar', 'twitter-2018-07-16.tar', 'twitter-2018-07-17.tar',
                  'twitter-2018-07-18.tar', 'twitter-2018-07-19.tar', 'twitter-2018-07-20.tar',
                  'twitter-2018-07-21.tar', 'twitter-2018-07-22.tar', 'twitter-2018-07-23.tar',
                  'twitter-2018-07-24.tar', 'twitter-2018-07-25.tar', 'twitter-2018-07-26.tar',
                  'twitter-2018-07-27(1).tar', 'twitter-2018-07-28.tar', 'twitter-2018-07-29.tar',
                  'twitter-2018-07-30.tar']

In [15]:
tar_files_list = ['twitter-2018-07-26.tar',
                  'twitter-2018-07-27(1).tar', 'twitter-2018-07-28.tar', 'twitter-2018-07-29.tar',
                  'twitter-2018-07-30.tar']

In [16]:
for i in tar_files_list:
    process_tar_file('tar_files/' + i)

Processing complete! Total time: 0:02:29.421206
Processing complete! Total time: 0:00:24.584450
Processing complete! Total time: 0:00:34.282709
Processing complete! Total time: 0:00:13.067030
Processing complete! Total time: 0:00:12.708246


In [17]:
process_json('2018/07/02', 'twitter_stream_csvs/july_two')

>>   tweets: 1043571
----------
>>   currently searching 2018/07/02\23
Total time to run: 0:15:31.396606
Shape of DataFrame: (1080494, 12)


Unnamed: 0,tweet_date,tweet_id,tweet_text,lang,retweet,tweet_url,tweet_reply,hashes,user_name,screen_name,user_verified,user_lang
0,Mon Jul 02 06:30:00 +0000 2018,1013671111194603520,RT @ThaLuckeyJr: ya im dtf\n\nd- down\nt- to\n...,en,False,False,False,[],Bee 🦕,bee_onkaa,False,en
1,Mon Jul 02 06:30:00 +0000 2018,1013671111194599425,RT @TradizionLarry: Taboo Brother Family https...,en,False,False,False,[],.,Roseyrose98,False,en
2,Mon Jul 02 06:30:00 +0000 2018,1013671111165468672,RT @tribelaw: Among the first cases the Court ...,en,False,False,False,[],rt,rtorres,False,en
3,Mon Jul 02 06:30:00 +0000 2018,1013671111194750976,"I’m doing everything I can for my boyfriend, f...",en,False,False,False,[],Khaila Ann Vargas,KhailaAnnV,False,en
4,Mon Jul 02 06:30:00 +0000 2018,1013671111165296640,Get Propecia now to forget the male pattern ha...,en,False,True,False,[],Marvin,Marvin64372558,False,en
...,...,...,...,...,...,...,...,...,...,...,...,...
1080489,Tue Jul 03 05:59:59 +0000 2018,1014025945106272256,RT @PopTartsUS: Things you shouldn’t put on yo...,en,False,False,False,[],Zixxorb (Alex),Zixxorb,False,en
1080490,Tue Jul 03 05:59:59 +0000 2018,1014025945118896129,Punish the pussy and give it a kiss 😘,en,False,False,False,[],pocahonta$ 💓,chiiink_,False,en
1080491,Tue Jul 03 05:59:59 +0000 2018,1014025945127190528,RT @jaycritch: You can’t Finesse a Finesser DUMMY,en,False,False,False,[],A,asiadenn_,False,en
1080492,Tue Jul 03 05:59:59 +0000 2018,1014025945093758976,Followed this guy on insta I used to talk to s...,en,False,True,False,[],Tierra,Burnziespizza88,False,en


In [18]:
df = process_json('2018/07/03', 'twitter_stream_csvs/july_three')

>>   tweets: 898484
----------
>>   currently searching 2018/07/03\20
Total time to run: 0:12:11.364803
Shape of DataFrame: (902436, 12)


In [19]:
df = process_json('2018/07/11', 'twitter_stream_csvs/july_eleven')

>>   tweets: 577102
----------
>>   currently searching 2018/07/11\23
Total time to run: 0:07:32.826890
Shape of DataFrame: (615851, 12)


In [20]:
df = process_json('2018/07/12', 'twitter_stream_csvs/july_twelve')

>>   tweets: 1048445
----------
>>   currently searching 2018/07/12\23
Total time to run: 0:14:41.606085
Shape of DataFrame: (1086872, 12)


In [21]:
df = process_json('2018/07/13', 'twitter_stream_csvs/july_thirteen')

>>   tweets: 1046414
----------
>>   currently searching 2018/07/13\23
Total time to run: 0:14:49.134354
Shape of DataFrame: (1085444, 12)


In [22]:
df = process_json('2018/07/14', 'twitter_stream_csvs/july_fourteen')

>>   tweets: 969644
----------
>>   currently searching 2018/07/14\23
Total time to run: 0:13:55.830673
Shape of DataFrame: (1007251, 12)


In [23]:
df = process_json('2018/07/15', 'twitter_stream_csvs/july_fifteen')

>>   tweets: 1034331
----------
>>   currently searching 2018/07/15\23
Total time to run: 0:14:56.389021
Shape of DataFrame: (1071798, 12)


In [24]:
df = process_json('2018/07/16', 'twitter_stream_csvs/july_sixteen')

>>   tweets: 1101203
----------
>>   currently searching 2018/07/16\23
Total time to run: 0:14:52.053455
Shape of DataFrame: (1141345, 12)


In [25]:
df = process_json('2018/07/17', 'twitter_stream_csvs/july_seventeen')

>>   tweets: 1074512
----------
>>   currently searching 2018/07/17\23
Total time to run: 0:14:16.684114
Shape of DataFrame: (1113110, 12)


In [26]:
df = process_json('2018/07/18', 'twitter_stream_csvs/july_eighteen')

>>   tweets: 1045359
----------
>>   currently searching 2018/07/18\23
Total time to run: 0:13:56.929638
Shape of DataFrame: (1083442, 12)


In [27]:
df = process_json('2018/07/19', 'twitter_stream_csvs/july_nineteen')

>>   tweets: 1028649
----------
>>   currently searching 2018/07/19\23
Total time to run: 0:14:03.651578
Shape of DataFrame: (1067000, 12)


In [28]:
df = process_json('2018/07/20', 'twitter_stream_csvs/july_twenty')

>>   tweets: 1001435
----------
>>   currently searching 2018/07/20\23
Total time to run: 0:13:59.503508
Shape of DataFrame: (1037888, 12)


In [29]:
df = process_json('2018/07/21', 'twitter_stream_csvs/july_twentyone')

>>   tweets: 936878
----------
>>   currently searching 2018/07/21\23
Total time to run: 0:13:44.143779
Shape of DataFrame: (974163, 12)


In [30]:
df = process_json('2018/07/22', 'twitter_stream_csvs/july_twentytwo')

>>   tweets: 982280
----------
>>   currently searching 2018/07/22\23
Total time to run: 0:14:08.711941
Shape of DataFrame: (1024820, 12)


In [31]:
df = process_json('2018/07/23', 'twitter_stream_csvs/july_twentythree')

>>   tweets: 1076589
----------
>>   currently searching 2018/07/23\23
Total time to run: 0:14:40.675356
Shape of DataFrame: (1114234, 12)


In [32]:
df = process_json('2018/07/24', 'twitter_stream_csvs/july_twentyfour')

>>   tweets: 1050635
----------
>>   currently searching 2018/07/24\23
Total time to run: 0:14:23.844904
Shape of DataFrame: (1088423, 12)


In [33]:
df = process_json('2018/07/25', 'twitter_stream_csvs/july_twentyfive')

>>   tweets: 1017148
----------
>>   currently searching 2018/07/25\23
Total time to run: 0:14:21.065782
Shape of DataFrame: (1053863, 12)


In [34]:
df = process_json('2018/07/26', 'twitter_stream_csvs/july_twentysix')

>>   tweets: 991858
----------
>>   currently searching 2018/07/26\23
Total time to run: 0:13:55.674786
Shape of DataFrame: (1028079, 12)


In [35]:
df = process_json('2018/07/27', 'twitter_stream_csvs/july_twentyseven')

>>   tweets: 1222842
----------
>>   currently searching 2018/07/27\23
Total time to run: 0:16:35.034813
Shape of DataFrame: (1257309, 12)


In [36]:
df = process_json('2018/07/28', 'twitter_stream_csvs/july_twentyeight')

>>   tweets: 885653
----------
>>   currently searching 2018/07/28\23
Total time to run: 0:13:59.093294
Shape of DataFrame: (920218, 12)


In [37]:
df = process_json('2018/07/29', 'twitter_stream_csvs/july_twentynine')

>>   tweets: 917318
----------
>>   currently searching 2018/07/29\23
Total time to run: 0:13:49.191035
Shape of DataFrame: (951774, 12)


In [38]:
df = process_json('2018/07/30', 'twitter_stream_csvs/july_thirty')

>>   tweets: 976493
----------
>>   currently searching 2018/07/30\23
Total time to run: 0:14:07.206761
Shape of DataFrame: (1013266, 12)


In [39]:
df = process_json('2018/07/31', 'twitter_stream_csvs/july_thirtyone')

>>   tweets: 988675
----------
>>   currently searching 2018/07/31\23
Total time to run: 0:14:38.263325
Shape of DataFrame: (1024708, 12)


In [40]:
df.head()

Unnamed: 0,tweet_date,tweet_id,tweet_text,lang,retweet,tweet_url,tweet_reply,hashes,user_name,screen_name,user_verified,user_lang
0,Tue Jul 31 06:30:00 +0000 2018,1024180359276064768,RT @ArchanaArchuu: Too many arguments will cau...,en,False,False,False,"[{'text': 'argument', 'indices': [76, 85]}]",ஏசுதாஸ்™,yesudoss_officl,False,en
1,Tue Jul 31 06:30:00 +0000 2018,1024180359305392128,RT @irina3529: Happy new week... 🌻🐾🍀🐾🌻 https:/...,en,False,False,False,[],[투표소手개표] '양심과 상식'이 일상이 되는 나라,pwbr000,False,ko
2,Tue Jul 31 06:30:00 +0000 2018,1024180359288823808,i’m in the mood to go for a long ride but ever...,en,False,False,False,[],mae,maeaes,False,en
3,Tue Jul 31 06:30:00 +0000 2018,1024180359276126208,RT @nbcsandiego: Out of all the pitches thrown...,en,False,False,False,"[{'text': 'Padres', 'indices': [59, 66]}]",WilliamBriggsDDS,WilliaBriggsDDS,False,en
4,Tue Jul 31 06:30:00 +0000 2018,1024180359305601025,Don't sleep on your dreams. You have the capac...,en,False,False,False,[],Nomfundo Khambule,NomfundoThePoet,False,en


# Testing regex filtering for finance-related tweets

In [48]:
copy = df.copy()

In [41]:
# this regex will match any string that begin with a "$" and has lower or upper case letter immediately following it
pattern = re.compile(r'(\$[A-Za-z]+)')

In [44]:
df['tweet_text'][0]

'RT @ArchanaArchuu: Too many arguments will cause a relationship to die. \nNo #argument however means the relationship is already dead.\n\n#Sha…'

In [46]:
matches = []
for i in range(len(df['tweet_text'])):
    matches.append(re.findall(pattern, df['tweet_text'][i]))

In [49]:
copy['regex_matches'] = matches
copy.head()

Unnamed: 0,tweet_date,tweet_id,tweet_text,lang,retweet,tweet_url,tweet_reply,hashes,user_name,screen_name,user_verified,user_lang,regex_matches
0,Tue Jul 31 06:30:00 +0000 2018,1024180359276064768,RT @ArchanaArchuu: Too many arguments will cau...,en,False,False,False,"[{'text': 'argument', 'indices': [76, 85]}]",ஏசுதாஸ்™,yesudoss_officl,False,en,[]
1,Tue Jul 31 06:30:00 +0000 2018,1024180359305392128,RT @irina3529: Happy new week... 🌻🐾🍀🐾🌻 https:/...,en,False,False,False,[],[투표소手개표] '양심과 상식'이 일상이 되는 나라,pwbr000,False,ko,[]
2,Tue Jul 31 06:30:00 +0000 2018,1024180359288823808,i’m in the mood to go for a long ride but ever...,en,False,False,False,[],mae,maeaes,False,en,[]
3,Tue Jul 31 06:30:00 +0000 2018,1024180359276126208,RT @nbcsandiego: Out of all the pitches thrown...,en,False,False,False,"[{'text': 'Padres', 'indices': [59, 66]}]",WilliamBriggsDDS,WilliaBriggsDDS,False,en,[]
4,Tue Jul 31 06:30:00 +0000 2018,1024180359305601025,Don't sleep on your dreams. You have the capac...,en,False,False,False,[],Nomfundo Khambule,NomfundoThePoet,False,en,[]


In [66]:
regex_tweets = copy[copy.regex_matches.apply(len) > 0]
regex_tweets.shape

(1723, 13)

In [71]:
regex_tweets.sample(2)

Unnamed: 0,tweet_date,tweet_id,tweet_text,lang,retweet,tweet_url,tweet_reply,hashes,user_name,screen_name,user_verified,user_lang,regex_matches
533191,Tue Jul 31 19:19:25 +0000 2018,1024373989328805889,Slb $SLB Position Has Lifted by Fulton Breakef...,en,False,True,False,[],US Index Live,usindexlive,False,en,"[$SLB, $BMY]"
263223,Tue Jul 31 14:10:04 +0000 2018,1024296138839605248,How cool $GOOGL 1203 would mark 7% decline als...,en,False,False,False,[],PrecisionTrade365,Kris_tin27,False,en,"[$GOOGL, $Aapl]"


In [70]:
regex_tweets['tweet_text'].loc[548733]

'Aye bruh 10 dollars is 10 dollars. $DeShawnBloomfield'

# Use regex to filter each day's tweets and save to new folder

In [78]:
# function to only retrieve the tweets that contain (or likely contain) a cashtag
def regex_filter_for_cashtag(input_file, input_path='twitter_stream_csvs/', 
                             output_path='finance_twitter_stream_csvs/finance_'):
    
    # load file and create the pattern, which search for a cashtag
    df = pd.read_csv(input_path + input_file + '.csv')
    pattern = re.compile(r'(\$[A-Za-z]+)')

    # find all regex matches and return them in a list
    matches = []
    for i in range(len(df['tweet_text'])):
        matches.append(re.findall(pattern, df['tweet_text'][i]))
    
    # add matches to the DataFrame
    df['regex_matches'] = matches

    # only keep tweets where there was a regex match
    regex_tweets = df[df.regex_matches.apply(len) > 0]

    # save the new df
    regex_tweets.to_csv(output_path + input_file + '.csv')
    print('File saved!')

In [79]:
regex_filter_for_cashtag(input_file='july_three')

File saved!


In [80]:
files_to_process = ['july_eighteen', 'july_eleven', 'july_fifteen', 'july_fourteen', 'july_nineteen', 'july_seventeen',
                    'july_sixteen', 'july_thirteen', 'july_thirty', 'july_thirtyone', 'july_twelve', 'july_twenty',
                    'july_twentyeight', 'july_twentyfive', 'july_twentyfour', 'july_twentynine', 'july_twentyone',
                    'july_twentyseven', 'july_twentysix', 'july_twentythree', 'july_twentytwo']

In [81]:
for i in files_to_process:
    regex_filter_for_cashtag(input_file=i)

File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!
File saved!


In [82]:
regex_filter_for_cashtag(input_file='twitter_stream_with_name', input_path='')

File saved!


In [86]:
df.columns

Index(['Unnamed: 0', 'tweet_date', 'tweet_id', 'tweet_text', 'lang', 'retweet',
       'tweet_url', 'tweet_reply', 'hashes', 'user_name', 'screen_name',
       'user_verified', 'user_lang', 'regex_matches'],
      dtype='object')

# Combine each day's file into one file

In [103]:
# change the directory to the folder with the files and find all files with .csv
extension = 'csv'
# os.chdir('finance_twitter_stream_csvs')
all_filenames = [i for i in glob.glob('*.{}'.format(extension))]

In [104]:
# combine all the files into one
combined_csv = pd.concat([pd.read_csv(f) for f in all_filenames])
combined_csv.to_csv('july_finance_tweets.csv', index=False)

In [105]:
combined_csv.head()

Unnamed: 0.2,Unnamed: 0,Unnamed: 0.1,tweet_date,tweet_id,tweet_text,lang,retweet,tweet_url,tweet_reply,hashes,user_name,screen_name,user_verified,user_lang,regex_matches
0,239,239,Wed Jul 18 06:30:20 +0000 2018,1019469400934703104,RT @tradingroomapp: #Bitcoin what next?\n\n$BT...,en,False,False,False,"[{'text': 'Bitcoin', 'indices': [20, 28]}]",Coin [shreds every coin] Shredder,ShredderSupport,False,en,['$BTC']
1,1316,1316,Wed Jul 18 06:32:16 +0000 2018,1019469887457103872,Fidus Investment Corp $FDUS Receives Consensus...,en,False,True,False,[],Ticker Report,TickerReport,False,en,['$FDUS']
2,1340,1340,Wed Jul 18 06:32:19 +0000 2018,1019469900061052928,The Coca-Cola $KO to Release Quarterly Earning...,en,False,True,False,[],Ticker Report,TickerReport,False,en,['$KO']
3,1350,1350,Wed Jul 18 06:32:20 +0000 2018,1019469904217542656,Home Bancshares Inc Forecasted to Post Q3 2018...,en,False,True,False,"[{'text': 'markets', 'indices': [105, 113]}]",IRA Market Report,IRAMarketReport,False,en,['$HOMB']
4,1616,1616,Wed Jul 18 06:32:53 +0000 2018,1019470042650546179,Intec Pharma Ltd $NTEC Given Consensus Rating ...,en,False,True,False,[],Macon Daily,macondailynews,False,en,['$NTEC']


In [106]:
combined_csv.shape

(39385, 15)

I end with a DataFrame that has nearly 40,000 tweets, which will need to be cleaned and precessed to remove tweets that are not actually finance-related and to prepare the text so that it can be used by a machine learning algorithm.