## Pipeline Level 1: Data Acquisition ##
This code downloads 500,000 comments from each of 16 different meme stock related subreddits.

Uses Pushshift.io published Python library for API access. The API breaks all requests into separate 1000 comment request chunks. There are many duplicates returned in each search sometimes as high as 20%. This must be due to the distributed nature of the database the API is pulling from. To de-duplicate efficiently, comments are hashed with a SHA-256 algorithm in order to deduplicate.

Comments for each stock ticker are stored in separate CSV files to be used in Pipeline Level 2.

The process of downloading all the comments took around 14 hours.

https://medium.com/swlh/how-to-scrape-large-amounts-of-reddit-data-using-pushshift-1d33bde9286

In [1]:
import os
import sys
# //*** Imports and Load Data
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import hashlib
import platform
import time
from pmaw import PushshiftAPI


#//*** Use the whole window in the IPYNB editor
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

#//*** Maximize columns and rows displayed by pandas
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)

api = PushshiftAPI()

import datetime as dt
before = int(dt.datetime(2021,6,26,0,0).timestamp())
after = int(dt.datetime(2020,12,1,0,0).timestamp())

print(int(time.time()))

subreddits = ["wallstreetbets", "stocks", "wallstreetbetsOGs", "spacs", "investing", "pennystocks", "stockmarket", "options", "robinhoodpennystocks", "wallstreetbetsnew", "smallstreetbets"]
subreddits = ["stocks", "wallstreetbetsOGs", "spacs", "investing", "pennystocks", "stockmarket", "options", "robinhoodpennystocks", "wallstreetbetsnew", "smallstreetbets"]

  from IPython.core.display import display, HTML


1666852281


In [2]:
from pmaw import PushshiftAPI

api = PushshiftAPI()

comments = api.search_comments(subreddit=subreddits, limit=100, before=before)

Not all PushShift shards are active. Query results may be incomplete.
Not all PushShift shards are active. Query results may be incomplete.


In [4]:
#//*** Update a target subreddit with new data
#//*** subreddit = subbreddit name
#//*** Method = 'before' or 'after' indicating if the records to be retrieved are before or after the target_utc. Defaults to After
#//*** limit is the number or records to grab
def update_subreddit(subreddit,method,limit):
    import time
    filename = f"..\\data\\{subreddit}_comments.csv.zip"

    #//*** Convert Path to Mac formatting if needed
    if platform.system() == 'Darwin':
        filename = filename.replace("\\","/")
    
    #//*** Check if File exists
    if os.path.isfile(filename):
        print ("Update Existing File")

        print(f"Reading csv: {filename}")
        start_time = time.time()
        update_df = pd.read_csv(filename, compression = 'zip')

        print(f"csv loaded: {round(time.time()-start_time,2)}s")

        print(f"csv Record count: {len(update_df)}")

        #//*** If before, get the largest (latest) utc
        if method == 'before':
            #//*** Get the Before utc from the stored csv
            before = update_df['created_utc'].min()

            print(f"Getting {limit} records before {before} utc")
            start_time = time.time()

            #//*** Download comments
            comments = api.search_comments(subreddit=subreddit, limit=limit, before=before)

            print(f"Download Time: {round(time.time()-start_time,2)}s")

        elif method == 'after':
            after = update_df['created_utc'].max() 

            print(f"Getting {limit} records after {after} utc")
            start_time = time.time()

            #//*** Download comments
            comments = api.search_comments(subreddit=subreddit, limit=limit, after=after)
            print(f"Download Time: {round(time.time()-start_time,2)}s")

        else:
            print(f"Method needs to be 'before' or 'after': [{method}] is invalid")
            return
    else:
        #//*** This is a new file, download 100k records starting right now
        after = int(time.time())
        before = int(time.time())
        
        limit = 1000000
        limit = 500000
        
        print(f"Getting {limit} records before {after} utc")
        start_time = time.time()
        
        update_df = pd.DataFrame()

        #//*** Download comments
        comments = api.search_comments(subreddit=subreddit, limit=100, before=before)

        print(f"Download Time: {round(time.time()-start_time,2)}s")
    

    
    #//***************************************************************************
    #//*** Download Complete
    #//***************************************************************************
    
    #//*** Convert comments to Dataframe
    raw_df = pd.DataFrame(comments)

    if len(raw_df) == 0:
        
        print(f"{subreddit} {method} {limit} - No Records Returned")
        return
    
    #//*** Columns to keep
    keep_cols = ["score","total_awards_received","created_utc","is_submitter","author_fullname","body","id","link_id","parent_id","stickied","permalink","retrieved_on","subreddit","subreddit_id"]
    
    #//*** Not all columns appear. This usually happens with small samples used for testing.
    #//*** Only use the keep_cols that are actually in the sample. The missing columns will be added during concat later
    actual_cols = []
    
    #//*** Loop through each column we want to keep
    for col in keep_cols:
        #//*** Add col to actual_cols if it exists
        if col in raw_df.columns:
            actual_cols.append(col)

    #//*** Keep the important columns
    raw_df = raw_df[actual_cols]
    
    print(f"Checking For Duplicates - Length Before: {len(raw_df)}")
    
    #//*** Hash the body, will use to check for duplicates
    #//*** Hash the body using sha-256
    #Sha256: Reference https://www.pythonpool.com/python-sha256/

    raw_df['hash'] = raw_df['body'].apply(lambda x:hashlib.sha256(x.encode()).hexdigest())


    # dropping Duplicates First. No sense in processing these
    raw_df.drop_duplicates(subset ="hash",keep = False, inplace = True)
    
    print(f"Checking For Duplicates - Length After: {len(raw_df)}")

    #print("Begin Cleaning")

    #//*** Clean text, tokenize and remove stop words
    #raw_df['clean'] = remove_stop_words(tokenize_series(mr_clean_text(raw_df['body'],{"remove_empty":False})))
    
    #//*** encode the comments
    #//*** Breaking this out into a separate function for readability and possible future flexibility
    #raw_df = encode_comments(raw_df)
    
    #//*** Combining existing dataframe with raw_df
    update_df = pd.concat([update_df,raw_df])
    print(f"Combined Dataframe Size:{len(update_df)}")

    # Check for Duplicates. 
    update_df.drop_duplicates(subset ="hash",keep = False, inplace = True)
    print(f"Dropping Duplicates - New Size:{len(update_df)}")

    #print("Replace NaN with Zeros")
    update_df = update_df.fillna(0)
    
    #//*** Sort the Dataframe by UTC date. This keeps the time series chronological. 
    #//*** No need to reindex, since index will be ignored at csv read/write
    update_df = update_df.sort_values('created_utc')

    #//*** Convert is Submitter,Stickied field to Boolean.
    #//*** Some early values are Integers and Strings
    update_df['is_submitter'] = update_df['is_submitter'].astype('bool')
    update_df['stickied'] = update_df['stickied'].astype('bool')
    update_df['author_fullname'] = update_df['author_fullname'].astype('str')
    
    print(f"Writing {filename}")

    start_time = time.time()
    
    
    update_df.to_csv(filename,compression="zip",index=False)    
    
    print(f"File Written: {round(time.time()-start_time,2)}s")
    
    print(f"update_subreddit() Complete: {len(update_df)} records")
    
    del update_df
    del raw_df
    





In [6]:
#//*** Add 100k Comments to wallstreetbets
#update_subreddit("wallstreetbets","after",100)

#update_subreddit("stocks","after",100)

for subreddit in subreddits:
    print(f"Updating: {subreddit}")
    try:
        update_subreddit(subreddit,"before",100)
    except TypeError:
        pass

print("done")


Updating: stocks
Update Existing File
Reading csv: ../data/stocks_comments.csv.zip


  update_df = pd.read_csv(filename, compression = 'zip')


csv loaded: 2.41s
csv Record count: 466922
Updating: wallstreetbetsOGs
Update Existing File
Reading csv: ../data/wallstreetbetsOGs_comments.csv.zip
csv loaded: 1.95s
csv Record count: 451021
Getting 100 records before 1611978410 utc


Not all PushShift shards are active. Query results may be incomplete.


Download Time: 0.26s
wallstreetbetsOGs before 100 - No Records Returned
Updating: spacs
Update Existing File
Reading csv: ../data/spacs_comments.csv.zip
csv loaded: 2.13s
csv Record count: 449120
Getting 100 records before 1590178346 utc


Not all PushShift shards are active. Query results may be incomplete.
Not all PushShift shards are active. Query results may be incomplete.


Download Time: 9.85s
Checking For Duplicates - Length Before: 100
Checking For Duplicates - Length After: 100
Combined Dataframe Size:449220
Dropping Duplicates - New Size:449220
Writing ../data/spacs_comments.csv.zip
File Written: 6.94s
update_subreddit() Complete: 449220 records
Updating: investing
Update Existing File
Reading csv: ../data/investing_comments.csv.zip
csv loaded: 2.38s
csv Record count: 421647
Getting 100 records before 1565372215 utc


Not all PushShift shards are active. Query results may be incomplete.
Not all PushShift shards are active. Query results may be incomplete.


Download Time: 2.16s
Checking For Duplicates - Length Before: 100
Checking For Duplicates - Length After: 98
Combined Dataframe Size:421745
Dropping Duplicates - New Size:421739
Writing ../data/investing_comments.csv.zip
File Written: 8.6s
update_subreddit() Complete: 421739 records
Updating: pennystocks
Update Existing File
Reading csv: ../data/pennystocks_comments.csv.zip
csv loaded: 1.84s
csv Record count: 412154
Getting 100 records before 1593804308 utc


Not all PushShift shards are active. Query results may be incomplete.
Not all PushShift shards are active. Query results may be incomplete.


Download Time: 1.32s
Checking For Duplicates - Length Before: 100
Checking For Duplicates - Length After: 96
Combined Dataframe Size:412250
Dropping Duplicates - New Size:412248
Writing ../data/pennystocks_comments.csv.zip
File Written: 6.32s
update_subreddit() Complete: 412248 records
Updating: stockmarket
Update Existing File
Reading csv: ../data/stockmarket_comments.csv.zip
csv loaded: 2.22s
csv Record count: 456443
Getting 100 records before 1530834525 utc


Not all PushShift shards are active. Query results may be incomplete.
Not all PushShift shards are active. Query results may be incomplete.


Download Time: 1.12s
Checking For Duplicates - Length Before: 100
Checking For Duplicates - Length After: 92
Combined Dataframe Size:456535
Dropping Duplicates - New Size:456535
Writing ../data/stockmarket_comments.csv.zip
File Written: 7.87s
update_subreddit() Complete: 456535 records
Updating: options
Update Existing File
Reading csv: ../data/options_comments.csv.zip
csv loaded: 2.36s
csv Record count: 451565
Getting 100 records before 1562274981 utc


Not all PushShift shards are active. Query results may be incomplete.
Not all PushShift shards are active. Query results may be incomplete.


Download Time: 1.21s
Checking For Duplicates - Length Before: 100
Checking For Duplicates - Length After: 100
Combined Dataframe Size:451665
Dropping Duplicates - New Size:451665
Writing ../data/options_comments.csv.zip
File Written: 8.62s
update_subreddit() Complete: 451665 records
Updating: robinhoodpennystocks
Update Existing File
Reading csv: ../data/robinhoodpennystocks_comments.csv.zip
csv loaded: 1.84s
csv Record count: 428037
Getting 100 records before 1531333645 utc


Not all PushShift shards are active. Query results may be incomplete.
Not all PushShift shards are active. Query results may be incomplete.


Download Time: 7.31s
Checking For Duplicates - Length Before: 100
Checking For Duplicates - Length After: 98
Combined Dataframe Size:428135
Dropping Duplicates - New Size:428135
Writing ../data/robinhoodpennystocks_comments.csv.zip
File Written: 6.11s
update_subreddit() Complete: 428135 records
Updating: wallstreetbetsnew
Update Existing File
Reading csv: ../data/wallstreetbetsnew_comments.csv.zip
csv loaded: 1.56s
csv Record count: 381419
Getting 100 records before 1594481539 utc


Not all PushShift shards are active. Query results may be incomplete.
Not all PushShift shards are active. Query results may be incomplete.
Not all PushShift shards are active. Query results may be incomplete.


Download Time: 8.85s
Checking For Duplicates - Length Before: 100
Checking For Duplicates - Length After: 98
Combined Dataframe Size:381517
Dropping Duplicates - New Size:381515
Writing ../data/wallstreetbetsnew_comments.csv.zip
File Written: 5.1s
update_subreddit() Complete: 381515 records
Updating: smallstreetbets
Update Existing File
Reading csv: ../data/smallstreetbets_comments.csv.zip


Not all PushShift shards are active. Query results may be incomplete.


csv loaded: 0.64s
csv Record count: 138856
Getting 100 records before 1575514800 utc
Download Time: 0.19s
smallstreetbets before 100 - No Records Returned
done


In [None]:
#print(len(raw_df['hash'].unique()))
#print(len(raw_df.tail()))
#raw_df

In [14]:
cd nlp_trader

/Users/muduo/GitHub/nlp_trader


In [21]:
# //*** Reference to manually read & write to Data Frame
# filename = f"data/wallstreetbets_comments.csv.zip"
# update_df = pd.read_csv(filename, compression = 'zip')
# print(len(update_df))
update_df[0:1]
# update_df
# filename = f".\\data\\wallstreetbets_comments_comments.csv"
# update_df.to_csv(filename, compression = 'zip',index=False)    

Unnamed: 0,score,total_awards_received,created_utc,is_submitter,author_fullname,body,id,link_id,parent_id,stickied,permalink,retrieved_on,subreddit,subreddit_id,hash
0,2,0.0,1334162803,False,t2_59t5b,This is a fantastic idea! I'll toss mine up in...,c4b0pvu,t3_s4jw1,t3_s4jw1,False,0,1428700000.0,wallstreetbets,t5_2th52,6827bc9e2385d87ecf7e53c54baab15186a20b47d0dde0...


In [None]:
#raw_df.to_csv("reddit_comments.csv.zip",compression="zip",index=False)

In [None]:
# #//*** Push shift scraper reference
# """
# #//*** Download the First 100,000 Comments from reddit pushsift
# subreddit="wallstreetbets"
# limit=100
# #comments = api.search_comments(subreddit=subreddit, limit=limit, before=before after=after)
# comments = api.search_comments(subreddit=subreddit, limit=limit, after=after)
# print(f'Retrieved {len(comments)} comments from Pushshift')

# #//*** Convert comments to Dataframe
# comments_df = pd.DataFrame(comments)

# #//*** Save DataFrame to a file for processing
# comments_df.to_csv(f"{subreddit}_raw_comments.csv.zip",compression="zip",index=False)
# """
# print()

In [None]:
#//*** Reference to create Stock ticker count matrix
"""

#//*** Build list of ticker symbols from NYSE and NASDAQ
#//*** Reads from Excel file.
#//*** Gets the Symbol column, and converts to lower case, 
nyse = pd.read_csv("NYSE_20210625.csv",header=None)[0].str.lower()
nasdaq = pd.read_csv("NASDAQ_20210625.csv",header=None)[0].str.lower()

#//*** Removes symbols with 1 and 2 character listings
nyse = list(nyse[nyse.apply(lambda x: len(x)>2) ])
nasdaq = list(nasdaq[nasdaq.apply(lambda x: len(x)>2) ])

symbols = nyse + nasdaq

#//*** Count each Stock mention add it to a dictionary of lists. Each list is filled with 0s. The Specific row index is updated with the relevant count. 
#//*** This Generates a word count matrix
stock_dict = {}

#//*** Keep Track of Rows
index = 0

for row in raw_df.iterrows():
    
    #//*** Get the cleaned body text
    body = row[1]['clean']
    
    #//*** For Each Stock Symbol
    for stock in symbols:
        
        #//*** Check if Stock exists in Body
        if stock in body:
            
            #//*** Reset the stock counter
            count = 0
            
            #//*** Loop through body and county ticker mentions
            for word in body:
                #//*** If word found increment count
                if stock == word:
                    count += 1
                    
            #//*** Check if symbol is in stock_dict
            if stock not in stock_dict.keys():    

                #//*** If not, then build it
                stock_dict[stock] = np.zeros(len(raw_df))
            
            #//*** Update the stock value at the 
            stock_dict[stock][index] = count

    #//*** Increment Index to keep with row index
    index +=1   

#//*** Loop through the dictionary key and lists
for col,values in stock_dict.items():
    
    #//*** Add each key (which is a stock ticker symbol) as a column using the list of ticker counts for Data
    raw_df[col] = values.astype('int')
    
"""
print()