# Reddit Data Ingestion

The following notebook is a demo of the reddit social data pulling implementation. When deployed it will:
- Will run continuously, retrieving social data hourly
- Search for social data related to the top *n* cryptocurrencies within various targetted subreddits.
- Stores the data within a persistent layer

### Differences to actual implementation:
- The RedditDataFetcher class will directly write data to either a local PostgresDB or a NeonDB server.

In [7]:
import os
import logging
import praw
from dotenv import load_dotenv
from psycopg2 import DatabaseError, pool
from concurrent.futures import ThreadPoolExecutor
import re
import threading
from datetime import datetime, timezone
import pandas as pd

load_dotenv()

True

### Order of flow
1. Connect to NeonDB to fetch the latest top *n* cryptocurrency data
2. Iterate through fetched data and search for them on a list of subreddits
4. Spin off each search query into a separate thread for performance
3. Pre-process posts and comments into useful data chunks containing cleaned text and relevant metadata
4. Write to NeonDB in batches (in this example, output as .csv file)

In [None]:
class RedditDataFetcher():
    """
    Responsible for pulling Reddit social sentiment data for the top N cryptocurrencies
    and storing it in a PostgreSQL database. Supports configurable time ranges and multithreaded fetch.
    """
    
    select_top_crypto_query = "SELECT rank, symbol, name FROM crypto_asset WHERE rank <= %s"

    insert_query = """
        INSERT INTO reddit_social_data (text, votes, subreddit, symbol, timestamp, source_id)
        VALUES (
            %(text)s,
            %(votes)s,
            %(subreddit)s,
            %(symbol)s,
            %(timestamp)s,
            %(source_id)s
        )
        ON CONFLICT (symbol, source_id, text) DO NOTHING;
    """
    
    def __init__(self, num_of_top_cryptocurrencies: int = 50, time_range: str = "hour", debug=False):

        self.m_log = setup_logger(__name__, debug=debug)

        self.top_n = num_of_top_cryptocurrencies
        self.time_range = time_range
        self.subreddits = ["CryptoCurrency"]
        self.timestamp = datetime.now(timezone.utc)
        self.source_id = 0  # TODO: Update with actual value
        # Concurrent Set responsible for deduping similar submissions
        self.submission_set = set()
        self.submission_set_lock = threading.Lock()
        self.csv_file_lock = threading.Lock()
        
        # Load reddit api credentials from .env
        self.reddit = praw.Reddit(
            client_id = os.getenv("CLIENT_ID"),
            client_secret = os.getenv("CLIENT_SECRET"),
            redirect_uri="http://localhost:8080",
            user_agent = os.getenv("USER_AGENT"),
        )

        # Multithreaded connection pool to postgresdb
        try:
            self.connection_pool = pool.ThreadedConnectionPool(1, 30, os.getenv("DATABASE_URL"))
            self.m_log.info("Connection pool created succesfully.")
        except (Exception, DatabaseError) as error:
            self.m_log.error(f"Error while creating connection pool: {error}")
            exit()
        
    
    def fetch_top_cryptocurrencies(self, top_n: int) -> list[dict]:
        conn = None
        cryptocurrencies = []
        try:
            conn = self.connection_pool.getconn()
            with conn.cursor() as cursor:
                cursor.execute(self.select_top_crypto_query, (top_n,))
                cryptocurrencies = [ {"rank": c[0], "symbol": c[1], "name": c[2] } for c in cursor.fetchall()]

        except (Exception, DatabaseError) as error:
            self.m_log.error(f"Error fetching top {top_n} cryptocurrency names: {error}")
        finally:
            if conn:
                conn.rollback()
                conn.close()
                self.connection_pool.putconn(conn)

        self.m_log.info("==================================== TOP CRYPTOCURRENCIES BY MARKET CAP ====================================")
        formatted_data = pd.DataFrame(cryptocurrencies)
        display(formatted_data)
        formatted_data.to_csv("top_n_cryptos.csv", index=False)

        return cryptocurrencies


    def fetch_and_store_data(self):
        cryptocurrencies = self.fetch_top_cryptocurrencies(self.top_n)

        # Clear reddit submissions set
        with self.submission_set_lock:
            self.submission_set.clear()

        # Update timestamp to latest time - Granularity of self.time_range
        self.timestamp = datetime.now(timezone.utc)

        # Concurrently fetch social data for different cryptocurrencies across multiple subreddits
        with ThreadPoolExecutor(max_workers=20) as executor:
            for subreddit in self.subreddits:
                for id, currency in enumerate(cryptocurrencies):
                    executor.submit(self.fetch_and_store_data_for_currency, id, currency, self.time_range,
                                    subreddit, cryptocurrencies)
        

    def fetch_and_store_data_for_currency(self, thread_id: int, currency: dict, time_range: str, 
                                          subreddit: str, currencies: list):
        self.m_log.info(f"Starting new thread id {thread_id} - pulling social data for {currency['name']} from Reddit")
        data = []
        for search_term in [currency["symbol"], currency["name"]]:
            for submission in self.reddit.subreddit(subreddit).search(query=search_term, time_filter=time_range):
                # Clean title and body text encoding
                title = self.preprocess_text(submission.title)
                body = self.preprocess_text(submission.selftext)

                # Check if current submission has already been seen
                with self.submission_set_lock:
                    if title in self.submission_set:
                        continue
                    self.submission_set.add(title)

                # If not already seen, add following submission and its comments into social data
                data.append({
                    "text": f"{title}\n{body}",
                    "votes": submission.score,
                    "subreddit": subreddit,
                    "symbol": currency["symbol"],
                    "timestamp": self.timestamp,
                    "source_id": self.source_id,
                })

                submission.comments.replace_more(limit=None)
                
                for comment in submission.comments.list():
                    text = self.preprocess_text(comment.body)
                    for referenced_coin in currencies:
                        if referenced_coin["symbol"] in text or referenced_coin["name"] in text:
                            data.append({
                                "text": text,
                                "votes": comment.score,
                                "subreddit": subreddit,
                                "symbol": referenced_coin["symbol"],
                                "timestamp": self.timestamp,
                                "source_id": self.source_id,
                            })

        self.m_log.info("==================================== SOCIAL SENTIMENT DATA - LAST DAY ====================================")
        # Write results to output
        formatted_data = pd.DataFrame(data)
        display(formatted_data)
        formatted_data.to_csv("reddit_output.csv", index=False)
        

    def preprocess_text(self, text: str) -> str:
        if not text:
            return ""
        # Encode and decode text in utf-8 format to remove unsupported characters
        cleaned_text = text.encode("utf-8", errors="ignore").decode("utf-8")
        # Remove hyperlinks
        cleaned_text = re.sub(r"\[.*?\]\((.*?)\)", r"\1", cleaned_text).strip()
        cleaned_text = re.sub(r'(https?:\/\/\S+|www\.\S+)', '', cleaned_text)
        return cleaned_text


def setup_logger(name: str, debug=False) -> logging.Logger:
    
    # Setup program logging
    # level = logging.DEBUG if debug else logging.INFO
    level = logging.INFO
    logging.basicConfig(level=level)
    logger = logging.getLogger(name)

    if debug:
        handler = logging.StreamHandler()
        handler.setLevel(logging.INFO)
        for logger_name in ("praw", "prawcore"):
            logger = logging.getLogger(logger_name)
            logger.setLevel(logging.INFO)
            logger.addHandler(handler)
    
    return logger


if __name__ == "__main__":
    reddit = RedditDataFetcher(time_range="day", debug=False)
    reddit.fetch_and_store_data()


INFO:__main__:Connection pool created succesfully.


Unnamed: 0,rank,symbol,name
0,1,BTC,bitcoin
1,2,ETH,ethereum
2,3,XRP,ripple
3,4,BNB,binance smart chain
4,5,SOL,solana


INFO:__main__:Starting new thread id 0 - pulling social data for bitcoin from Reddit
INFO:__main__:Starting new thread id 1 - pulling social data for ethereum from Reddit
INFO:__main__:Starting new thread id 2 - pulling social data for ripple from Reddit
INFO:__main__:Starting new thread id 3 - pulling social data for binance smart chain from Reddit
INFO:__main__:Starting new thread id 4 - pulling social data for solana from Reddit




Unnamed: 0,text,votes,subreddit,symbol,timestamp,source_id
0,Algorand (ALGO) is Available for On-Chain Stak...,4,CryptoCurrency,SOL,2025-07-01 10:11:21.018301+00:00,0
1,tldr; Algorand (ALGO) is now available for on-...,2,CryptoCurrency,ETH,2025-07-01 10:11:21.018301+00:00,0
2,tldr; Algorand (ALGO) is now available for on-...,2,CryptoCurrency,SOL,2025-07-01 10:11:21.018301+00:00,0




Unnamed: 0,text,votes,subreddit,symbol,timestamp,source_id
0,BNB Smart Chain’s Maxwell Upgrade Slashes Bloc...,11,CryptoCurrency,BNB,2025-07-01 10:11:21.018301+00:00,0
1,tldr; BNB Smart Chain implemented the Maxwell ...,1,CryptoCurrency,BNB,2025-07-01 10:11:21.018301+00:00,0
2,NEXUS Partners with KODA for Institutional-Gra...,2,CryptoCurrency,BNB,2025-07-01 10:11:21.018301+00:00,0




Unnamed: 0,text,votes,subreddit,symbol,timestamp,source_id
0,ETH trading pattern points to 100% rally to $5...,73,CryptoCurrency,ETH,2025-07-01 10:11:21.018301+00:00,0
1,If the STABLE and GENIUS acts are reconciled a...,4,CryptoCurrency,ETH,2025-07-01 10:11:21.018301+00:00,0
2,If you ain’t huffing deep aggressive amount’s ...,4,CryptoCurrency,ETH,2025-07-01 10:11:21.018301+00:00,0
3,I own ETH so I guarantee you it doesn’t top 3K...,3,CryptoCurrency,ETH,2025-07-01 10:11:21.018301+00:00,0
4,tldr; Ether (ETH) shows a 'Power of 3' trading...,2,CryptoCurrency,ETH,2025-07-01 10:11:21.018301+00:00,0
5,Pattern: ETH to 5k\n\nReality: ETH at 2.5k for...,1,CryptoCurrency,ETH,2025-07-01 10:11:21.018301+00:00,0
6,"Eth was awesome, you could mine 5 bucks a day ...",4,CryptoCurrency,ETH,2025-07-01 10:11:21.018301+00:00,0
7,Anything to make it worth holding over BTC (or...,2,CryptoCurrency,BTC,2025-07-01 10:11:21.018301+00:00,0
8,First US Solana Staking ETF Launches\n,14,CryptoCurrency,ETH,2025-07-01 10:11:21.018301+00:00,0
9,tldr; The U.S. is set to launch its first Sola...,3,CryptoCurrency,SOL,2025-07-01 10:11:21.018301+00:00,0




Unnamed: 0,text,votes,subreddit,symbol,timestamp,source_id
0,"Daily Crypto Discussion - June 30, 2025 (GMT+0...",23,CryptoCurrency,BTC,2025-07-01 10:11:21.018301+00:00,0
1,SOL making moves atleast,8,CryptoCurrency,SOL,2025-07-01 10:11:21.018301+00:00,0
2,Nice candle on solana,5,CryptoCurrency,SOL,2025-07-01 10:11:21.018301+00:00,0
3,All the depressing comments here are exciting....,5,CryptoCurrency,BTC,2025-07-01 10:11:21.018301+00:00,0
4,What’s the word with SOL are people buying thi...,5,CryptoCurrency,SOL,2025-07-01 10:11:21.018301+00:00,0
...,...,...,...,...,...,...
126,I was like you I was at 33x with BTC and told ...,1,CryptoCurrency,BTC,2025-07-01 10:11:21.018301+00:00,0
127,"Not sure about 7 years, but gold has 10x marke...",1,CryptoCurrency,BTC,2025-07-01 10:11:21.018301+00:00,0
128,The point is you’ll be using your btc as cash ...,3,CryptoCurrency,BTC,2025-07-01 10:11:21.018301+00:00,0
129,Dude BTC was used more for real world transact...,1,CryptoCurrency,BTC,2025-07-01 10:11:21.018301+00:00,0
