In [29]:
#!usr/bin/env python3
import pandas as pd
from pathlib import Path
import praw
from tqdm.notebook import tqdm
import pandas as pd
import yfinance as yf
import re

# Options

In [30]:
root = Path.cwd()
compressed_data_root = root / 'posts'
reddit_csv_path = root.parent / '10y_reddit_data.csv'
stock_csv_path = root.parent / '10y_stock_data.csv'

years=10

subreddits = ["stocks", "StockMarket", "investing", "wallstreetbets", "options", "trading"]

stock_dict = {
    'nvidia': ['nvda', 'nvidia'],
    'tesla': ['tsla', 'tesla'],
    'apple': ['aapl', 'apple'],
    'amazon': ['amzn', 'amazon'],
    'microsoft': ['msft', 'microsoft'],
    'google': ['googl', 'google', 'alphabet']
}

tickers = ['nvda', 'tsla', 'aapl', 'amzn', 'msft', 'googl']

praw_api = praw.Reddit(
            client_id="5uFqCBUPadVnIxKHG0hnhw",
            client_secret="LDFyai0bjEAkEQqo5joU7PjSjtq2eQ",
            user_agent="TeslaScraper:v1.0 (by u/RecognitionSame5433)"
        )

# Reddit data fields
- id
- created_utc
- timestamp
- author
- title
- selftext
- score
- num_comments
- query (only for api data)
- stock
- subreddit
- source (api or archive)

# Stock data fields per ticker
- Open
- High
- Low
- Close
- Volume
- Open_pct
- High_pct
- Low_pct
- Close_pct
- Revenue
- Earnings
- Revenue
- Earnings
- Revenue_pct
- Earnings_pct

In [None]:
class RedditData:
    def __init__(self, years, subreddits, stock_dict, praw_api):
        self.subreddits = subreddits
        self.stock_dict = stock_dict
        self.api = praw_api
        self.timedelta = pd.Timedelta(days=365 * years)
        self.df = pd.DataFrame()

    def search_subreddit(self, subreddit_name:str, query:str):
        subreddit = self.api.subreddit(subreddit_name)
        gen = subreddit.search(query, sort='new', time_filter='year', limit=1000)
        rows = []
        for s in gen:
            rows.append({
                    "id": s.id,
                    "created_utc" : s.created_utc,
                    "timestamp": pd.to_datetime(s.created_utc, unit='s', utc=True),
                    "author": str(s.author) if s.author else None,
                    "title": s.title,
                    "selftext": s.selftext,
                    "score": s.score,
                    "num_comments": s.num_comments,
                })
        return pd.DataFrame(rows)

    # Get year of data for all subreddits and queries
    def load_api_data(self):
        df_list = []
        for stock, query_list in self.stock_dict.items():
            for query in tqdm(query_list,
                              leave=False,
                              total=sum(len(sub) for sub in self.stock_dict.values())):
                for sub in tqdm(self.subreddits,
                                leave=False,
                                total=len(self.subreddits)):
                    df = self.search_subreddit(sub, query)
                    df['query'] = query
                    df['stock'] = stock
                    df['subreddit'] = sub
                    df_list.append(df)
        nonempty_dfs = [df for df in df_list if not df.empty]
        print(f'{len(nonempty_dfs)} out of {len(df_list)} queries had hits')
        new_df = pd.concat(df_list, ignore_index=True)
        self.add_data(new_df, 'api')
        print('API data loaded')

    # Parses all compressed data root
    def load_compressed_ndjson(self, root: Path):
        df_list = []
        for file_path in root.rglob('*.zst'):
            print(f'Extracting {file_path.name}')
            subreddit = file_path.stem.removesuffix('_submissions')
            chunk_iter = pd.read_json(
                file_path,
                lines=True,
                compression='zstd',
                chunksize=2**16
            )
            df_chunks = []
            # Process and append each chunk
            for df_chunk in tqdm(chunk_iter, desc='Processing file'):
                df_chunk = self.restrict_columns_for(df_chunk)
                df_chunk['timestamp'] = pd.to_datetime(df_chunk['created_utc'], unit='s', utc=True)
                df_chunk = self.restrict_time_for(df_chunk)
                df_chunk = self.keyword_filter_for(df_chunk)
                df_chunks.append(df_chunk)
            if not df_chunks:
                raise RuntimeError('File yielded no rows')
            df = pd.concat(df_chunks, ignore_index=True)
            df = self.drop_duplicates_for(df)
            df['subreddit'] = subreddit
            df_list.append(df)
        if not df_list:
            raise RuntimeError('Failed to load any data from files')
        new_df = pd.concat(df_list, ignore_index=True)
        self.add_data(new_df, 'archive')
        print('Compressed data loaded')
    
    # Remove duplicates and check for overlap when necessary
    def add_data(self, df, source):
        df = self.drop_duplicates_for(df)
        df['source'] = source
        if self.df.empty:
            self.df = df
        else:
            new_df = pd.concat([self.df, df], ignore_index=True)
            overlap = new_df.duplicated(subset=['id', 'stock']).sum()
            if overlap == 0:
                raise ValueError('Time ranges do not overlap')
            print(f'Dataframe overlap of {overlap} rows')
            self.df = self.drop_duplicates_for(new_df)

    def keyword_filter_for(self, df):
        keyword_cols = ['title', 'selftext']
        text_df = df[keyword_cols]
        df_list = []
        for stock, query_list in self.stock_dict.items():
            pattern = '|'.join(re.escape(q) for q in query_list)
            mask = (
                text_df
                .apply(lambda col: col.astype(str).str.contains(pattern, case=False, na=False))
                .any(axis=1)
            )
            df_part = df[mask].copy()
            df_part['stock'] = stock
            df_list.append(df_part)
        if df_list:
            new_df = pd.concat(df_list, ignore_index=True)
            new_df = self.drop_duplicates_for(new_df)
        else:
            new_df = df.iloc[0:0].copy()
        return new_df
            
    def restrict_columns_for(self, df):
        reddit_columns = ['id', 'created_utc', 'author', 'title', 'selftext', 'score', 'num_comments']
        new_df = df[reddit_columns].copy()
        return new_df

    def restrict_time_for(self, df):
        start_timestamp = pd.Timestamp.utcnow() - self.timedelta
        new_df = df[df.timestamp >= start_timestamp].copy()
        return new_df

    # Remove duplicates for (id, stock) and enforce timeframe
    def drop_duplicates_for(self, df):
        new_df = df.drop_duplicates(subset=['id', 'stock'], inplace=False)
        return new_df


class StockData:
    def __init__(self, tickers, years, interval='1d'):
        self.tickers = tickers
        df = yf.download(tickers, period=f'{years}y', interval=interval, auto_adjust=True)
        if df is None:
            raise ValueError('Data failed to download')
        self.df = df

    # Forward fill OHLC and zero Volume
    def impute_off_days(self):
        full_idx = pd.date_range(self.df.index.min(), self.df.index.max(), freq='D')
        self.df = self.df.reindex(full_idx)
        self.df['Volume'] = self.df['Volume'].fillna(0)
        self.df = self.df.ffill()

    # Create percent change columns for prices
    def create_pct_columns(self):
        price_fields = ['Open', 'High', 'Low', 'Close']
        price_df = self.df[price_fields]
        pct_df = price_df.pct_change()
        l0_fields = pct_df.columns.get_level_values(0)
        l1_fields = pct_df.columns.get_level_values(1)
        l0_fields = [s+'_pct' for s in l0_fields]
        pct_df.columns = pd.MultiIndex.from_arrays(
            [l0_fields, l1_fields], names=pct_df.columns.names)
        self.df = pd.concat([self.df, pct_df], axis=1).sort_index(axis=1)


# Create Reddit data

In [35]:
reddit_data = RedditData(years, subreddits, stock_dict, praw_api)

In [None]:
reddit_data.load_compressed_ndjson(compressed_data_root)

In [None]:
reddit_data.load_api_data()

In [42]:
reddit_data.df.head()

Unnamed: 0,id,created_utc,author,title,selftext,score,num_comments,timestamp,stock,subreddit,query
0,46cgr4,1455762000.0,Oranguthang,First option trade: 10 NVDA @28 3/4 expiry. No...,"I paid for the extra week, when does time real...",1,13,2016-02-18 02:21:58+00:00,nvidia,options,
1,4iamkx,1462639000.0,RTiger,earnings week 5/9+,Earnings season is winding down.\n\nA few nota...,6,7,2016-05-07 16:29:19+00:00,nvidia,options,
2,5cfsb4,1478886000.0,irishtrader,NVDA - Huge option move discrepancies - reason?,,3,5,2016-11-11 17:45:52+00:00,nvidia,options,
3,5dzin9,1479669000.0,DeadL0cked,What am I missing here? This is a vertical put...,,3,13,2016-11-20 19:03:25+00:00,nvidia,options,
4,5jed7p,1482254000.0,jstubb,Help explaining a Call Spread,"Ok, I just came across something I haven't see...",6,5,2016-12-20 17:06:45+00:00,nvidia,options,


In [None]:
assert isinstance(reddit_data.df, pd.DataFrame)
reddit_data.df.to_csv(reddit_csv_path, encoding='utf-8', index=True, header=True, sep=',')

# Create stock data

In [19]:
stock_data = StockData(tickers, years)

[*********************100%***********************]  6 of 6 completed


In [None]:
stock_data.impute_off_days()

In [None]:
stock_data.create_pct_columns()

In [None]:
stock_data.df.to_csv(stock_csv_path, encoding='utf-8', index=True, header=True, sep=',')