# Data Pipeline Prototype

## Importing libraries

In [12]:
# ! pip install pandas praw prawcore python-dotenv pyarrow

In [104]:
import pandas as pd
import praw, prawcore, praw.models
import time, os, sys, functools, random, json
import datetime as dt
import pyarrow as pa
import pyarrow.parquet as pq
from dotenv import load_dotenv
from typing import Optional, List, Dict, Any, Union, Tuple, NamedTuple
from itertools import product
from collections import deque, namedtuple
from collections.abc import Generator

## Setting up access to Reddit API
Access keys to Reddit API are stored in a .env file under the config directory of this repository. A template for the .env file is provided in the config directory.

The config.py script assigns the environment variables to the `PRAW_ID`, `PRAW_SECRET`, `PRAW_USER_AGENT`, `PRAW_USERNAME`, and `PRAW_PASSWORD` global variables respectively.  

In [105]:
# Load .env file for access keys
module_path = '../src'
load_dotenv(os.path.join(module_path,'config','.env'))

# Import config.py to access environment variables
if module_path not in sys.path:
    sys.path.append(module_path)
    
from usedcaranalytics.config.api import PRAW_ID, PRAW_SECRET, PRAW_USER_AGENT, PRAW_USERNAME, PRAW_PASSWORD

In [106]:
if all((PRAW_ID, PRAW_SECRET, PRAW_USER_AGENT, PRAW_USERNAME, PRAW_PASSWORD)):
    print('Successfully loaded API keys and login credentials.')
else:
    raise Exception('Reddit API keys and login credentials unsuccessfully loaded. Retry the script.')

Successfully loaded API keys and login credentials.


In [107]:
# Initialize PRAW 
REDDIT = praw.Reddit(
    client_id=PRAW_ID,
    client_secret=PRAW_SECRET,
    username=PRAW_USERNAME,
    password=PRAW_PASSWORD,
    user_agent=PRAW_USER_AGENT
)

## Extracting text data

This section deals with the process of extracting and storing text data and metadata from Reddit posts and comments. My objective is to present my thought process and design principles in implementing the data pipeline for this project.

__Data Pipeline Overview:__
1. Establish access to Reddit API
2. Crawl predefined subreddits by searching submissions using predefined queries
3. Extract textual data and metadata from relevant posts and child comments
4. Preprocess data (Optional)
5. Store extracted data to disk

### The Challenge of Reddit API Rate Limiting

__Building the search query and subreddit pairs__

To scrape the relevant text data from Reddit, I created a small list of queries covering diverse yet relevant topics to buying affordable used vehicles. The queries involved location-specific, model-specific, and thematic keywords to ensure that the search covers as much ground as possible. Chosen subreddits have > 1e5 subscribers to ensure that search queries will yield a significant amount of results per API request. These queries and subreddits can be accessed in the paths: `../src/search_queries.txt` and `../src/subreddits.txt`, respectively. 

__Searching relevant posts per Subreddit__

The objective is to search and scrape for posts (and child comments) within the specified subreddits using the search queries provided. However, with a 10x10 query and subreddit array, I expect at least an initial 100 requests for the subreddit search yielding 100x100 submissions at most. Fetching the comments involves significantly more requests as each submission requires 1 request to yield the CommentForest. Fetching the comments will require at least 10,000 requests.

__Expected Minimum API Requests__
|Search Requests|Comment Fetch Requests|Total Requests|
|:----------|:----------|:----------|
|100      |10,000  |10,100|

From the table above, a single batch job covering all query-subreddit combinations will yield at least 10,100 API requests in a single go, which wildly exceeds the Reddit API fair use policy (i.e. Cap requests to 100/min averaged over 10-minute sliding window). 

__Implementing a sliding window request counter and backoff algorithms__

To ensure the script adheres to fair use policies, I implemented two-pronged fail-safe logic:
1. Handle transient failures for each API request by implementing a backoff algorithm
2. Mitigate the risk of #1 happening by implementing a program-level API request counter that tracks current and expected calls within a specified sliding window. This rate limiter will throttle requests until there's an available slot.

__Read more:__
1. [API Rate Limits Explained: Best Practices for 2025](https://orq.ai/blog/api-rate-limit)
2. [Exponential Backoff And Jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/)
3. [Yield Statements vs. Returning Lists in Python](https://www.google.com/url?sa=t&source=web&rct=j&opi=89978449&url=https://community.aws/content/2h01Byx1ytU8357tp2bvcUuJ2j0/yield-statements-vs-returning-lists-in-python%23:~:text%3DYield%253A%2520Ideal%2520for%2520large%2520data,potentially%2520leading%2520to%2520memory%2520errors.&ved=2ahUKEwjzvJvd74uOAxVkQ6QEHVAVMHcQFnoECBIQAw&usg=AOvVaw3hMoJHnPwBIQOdBmB_NiBD)
4. [Rate Limiter - Sliding Window Counter](https://medium.com/@avocadi/rate-limiter-sliding-window-counter-7ec08dbe21d6)

##### Rate Limiter Class

The RateLimiter class is initialized at the beginning of the script and is used to track API requests made within a specific sliding window. Requests are throttled when total expected requests go beyond rate limits (Reddit = 100/min) for the current window. Jitter is injected to the wait time and a random buffer for requests is left to avoid coasting at rate limits (abuse avoidance).

In [235]:
# PRAW auth limits checker implementation

class RateLimiter:
    def __init__(self, reddit:praw.Reddit, buffer_range:Tuple[int]=(50,100)):
        self.REDDIT = reddit
        self.BUFFER_RANGE = buffer_range
        self.total_requests = 0
        
    def evaluate(self):
        """Checks if current request can be accommodated based on current limits."""
        # Check remaining requests and randomize buffer to ensure requests dont coast at rate limit
        remaining_requests = self.REDDIT.auth.limits['remaining']
        buffer = random.randint(*self.BUFFER_RANGE)
        
        # If we dip into the buffer, sleep until limits reset
        if remaining_requests - 1 < buffer:
            # Calculate time left until limits refresh and add jitter
            reset_ts = self.REDDIT.auth.limits['reset_timestamp']
            delay = max(reset_ts - time.time(), 0) + random.randrange(0.01, 5.0)
            time.sleep(delay)
            # Re-evaluate if API call can proceed
            return self.evaluate()
        
        # Tally API call
        self.total_requests += 1
        
    def print_total_requests(self):
        return f'{self.total_requests} total requests as of {dt.datetime.now():%Y-%m-%d %H:%M:%S}.'
    
    def print_remaining_requests(self):
        limits = self.REDDIT.auth.limits
        # If limits hasn't refreshed yet, return remaining requests. Otherwise, return total limit
        if time.time() < limits['reset_timestamp']:
            return limits['remaining_requests']
        else:
            return limits['remaining_requests'] + limits['used']
    
    def __str__(self):
        return f'RateLimiter object: {self.print_remaining_requests()} available requests in current window.'

In [None]:
# # Sliding window request counter implementation

# class RateLimiter:
#     """Rate Limiter with sliding window implementation."""
#     def __init__(self, max_requests:int=100, period:float=60.0, jitter_seconds:Union[List[float],Tuple[float]]=(0.1,5.0)):
#         self.MAX_REQUESTS = max_requests
#         self.PERIOD = period
#         self.JITTER_SECONDS = jitter_seconds
#         self._requests_in_window = deque()
#         self.total_requests = 0
        
#     def wait_for_slot(self, n_request:int=1) -> None:
#         """
#         Delays execution of subsequent API request or code chunk to ensure maximum
#         function calls or request adheres to rate limits within a specified window.
#         """
#         window_end = time.time()
#         window_start = window_end - self.PERIOD
        
#         # Remove older batches when timestamp is out of current window
#         while self._requests_in_window and self._requests_in_window[0] < window_start:
#             self._requests_in_window.popleft()
        
#         # Check if additional request can be accommodated given requests made in current window
#         if len(self._requests_in_window) + n_request > self.MAX_REQUESTS:
#             # Wait time is adjusted by jitter
#             wait_time = (self._requests_in_window[0] + self.PERIOD) - window_end
#             time.sleep(max(wait_time + random.uniform(*self.JITTER_SECONDS), 0))
#             # Re-run the function and determine if request can be accommodated
#             return self.wait_for_slot(n_request)
        
#         # Enqueue current request to trace requests
#         for _ in range(n_request):
#             self._requests_in_window.append(time.time())
#             self.tally_request()
    
#     def tally_request(self):
#         """Tallies current request and stores it in instance memory."""
#         self.total_requests += 1
#         return self
    
#     def print_total_requests(self):
#         return f'{self.total_requests} total requests as of {dt.datetime.now():%Y-%m-%d %H:%M:%S}.'
    
#     def __repr__(self):
#         param_dict = {
#             'max_requests' : self.MAX_REQUESTS,
#             'period' : self.PERIOD,
#             'jitter_seconds' : self.JITTER_SECONDS
#             }
#         return f'RateLimiter({", ".join([f'{k}={v}' for k, v in param_dict.items()])})'
    
#     def __get__(self):
#         return {
#             'MAX_REQUESTS':self.MAX_REQUESTS, 
#             'PERIOD':self.PERIOD, 
#             'JITTER_SECONDS':self.JITTER_SECONDS, 
#             '_requests_in_window': self._requests_in_window, 
#             'total_requests': self.total_requests
#             }

In [236]:
# Initialize rate limiter
rate_limiter = RateLimiter(REDDIT)

##### Decorator Factory for implementing Exponential Backoff and Full Jitter
The function below creates a flexible decorator that can be adjusted based on the intended maximum retries, exponential backoff caps, and inclusion of jitter.

In [237]:
def backoff_on_rate_limit(max_retries:int=5, 
                        base_delay:float=1.0, 
                        cap_delay:float=60.0, 
                        jitter:bool=True):
    """
    Decorator factory that applies exponential backoff (with optional jitter) when Reddit API
    rate limits (HTTP 429) or server errors occur. Stops after max_retries and re-raises the exception.
    
    Input:
        - Integer value for max retries. When attempts exceed this number, an Exception is raised
        - Float for base delay in seconds (i.e. Delay at first failed attempt)
        - Float for maximum delay in seconds
        - Bool on whether to implement full jitter or not
    Output:
        - Decorator to be applied to an PRAW API request wrapper
    """
    def decorator(func):# -> _Wrapped[Callable[..., Any], Any, Callable[..., Any], Any]:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Start with base delay, then exponentially scale by attempt
            attempt = 0
            while True:
                try:
                    return func(*args, **kwargs)
                except prawcore.exceptions.ResponseException as e:
                    if attempt > max_retries:
                        raise Exception("Max retries exceeded with Reddit API.")
                    delay = min(cap_delay, base_delay * 2 ** attempt)
                    if jitter:
                        delay = random.uniform(0, delay)
                    print(f"[WARNING] {e.__class__.__name__} on attempt {attempt+1}, retrying after {delay:.2f}s.")
                    time.sleep(delay)
                    attempt += 1
        return wrapper
    return decorator

#### API Call Wrappers with Backoff Algorithms

The helper functions were designed to extract relevant data and metadata from Reddit submissions and comments, and package the data into a dict of dicts that can be easily parsed into a Pandas DataFrame object for further analysis. The backoff decorator is applied to each API call wrapper to handle transient errors raised by HTTP 429 response (Too Many Requests).

In [195]:
@backoff_on_rate_limit()
def fetch_submissions(subreddit:object, query:str, limit:int=100, **kwargs):
    """Modify the subreddit search from PRAw to ensure adherence to safe request limits."""
    # Record API request and throttle if needed
    rate_limiter.evaluate()
    return subreddit.search(**kwargs, query=query, limit=limit)

@backoff_on_rate_limit()
def fetch_comments(submission:object, limit:int=0):
    """Modify the comment fetch from PRAW to ensure adherence to safe request limits."""
    # Record API request and throttle if needed
    rate_limiter.evaluate()
    # Replace 'more' with specified limit (default = 0 or retain top-level comments only)
    submission.comments.replace_more(limit=limit)
    for comment in submission.comments:
        yield comment

#### Text File Parser for Subreddit and Search Queries

To limit hardcoding and allow for flexible scraping, I opted to store the list of subreddit names and search queries in their individual text files so that I can simply update that file and re-run the script whenever I need to scrape something rather than digging through the source code whenever I need to modify the search pairs.

In [196]:
def parse_txt_file(file_path:str):
    """
    Utility function for parsing a multi-line text file where each item is separated
    by a newline.
    
    Input:
        - String for the path of text file, with each item separated by a newline
    Output:
        - List (e.g. search queries, subreddit names)
    """
    with open(file_path, 'r') as f:
        # Ignore comments and empty lines
        results = [line.rstrip("\n") for line in f if not (line.startswith('#') or line.startswith("\n"))]
    return results

In [197]:
def get_search_pairs(
    project_root:str, 
    subdir:str='data/raw', 
    file_names:Union[Tuple[str],List[str]]=('search_queries.txt','subreddits.txt')
    ):
    """
    Utility function that returns tuple of lists of subreddit names and search queries.
    Input:
        - project root path
        - subdirectory path that contains the files
        - file names in strict order: 1) search queries, 2) subreddits
    Output:
        - Tuple(list of queries, list of subreddits)
    """
    
    query_fp, subreddit_fp = tuple(
        os.path.join(project_root, subdir, fname) for fname in file_names
        )
    
    return tuple(parse_txt_file(fp) for fp in (query_fp, subreddit_fp))

### Data Streaming

__Rationale: Scalability of Scraping Logic__

Previously, I explored building dictionaries within each scraping function and returning that dictionary to the data storage logic. However, this approach doesn't scale well since device memory may become a bottleneck with larger volumes of API calls. From my research, it's recommended to use generators to stream data from APIs as memory overhead is limited to the data extracted from the most recent call.

__Data Extraction Overview__:

1. Initialize the rate limiter class to keep track of requests within a 60-second sliding window.
2. Parse the text files containing subreddit names and search queries, then get the combination of search pairs.
3. Given a search pair, initialize a Subreddit class and search relevant submissions within that subreddit.
4. For every relevant submission, extract relevant data from Submission class attributes and stream a tuple of record type and submission data dictionary
5. Subsequently, for every submission, request the top-level comments, and for each top-level comment, stream a tuple of record type and comment data dictionary.

#### Data Streaming Functions
1. Streamer for comments given a praw.models.Submission object.
2. Streamer for submission data and child comments. This is a wrapper for the comment streamer that takes a single search pair as input, streams all relevant submissions and child comments.
3. Wrapper for #2 and takes a list of subreddits and search queries and inputs the search pair combinations into the submission and comment streaming function

In [198]:
def stream_comments(submission:object, limit:int=0):
    """
    Fetches comments from a Submission objects then parses each comment into a dictionary record.
    Each entry is streamed for efficient memory footprint when handling larger CommentForests.
    
    Input:
        - Submission object from PRAW (i.e. Reddit posts)
        - Integer for .replace_more limit parameter, default=0 (i.e. top/parent comments only)
    Output:
        - Dict of comments in the format {comment_id : {data_header: data_value}}  
    """
    assert (isinstance(limit, int) and limit >= 0) or limit is None, 'Limit must be an integer >= 0 or None.'
    
    # Update comments dict with info dict 
    for comment in fetch_comments(submission, limit=limit):
        
        # Stream comment data when slot available in current window
        yield "comment", {
            'comment_id':comment.id,
            'body':comment.body,
            'score':comment.score,
            'timestamp':int(comment.created_utc),
            'subreddit':comment.subreddit_name_prefixed,
            'parent_submission_id':submission.id
            }

In [199]:
def stream_submissions_and_comments(subreddit_name:str, query:str, limit:int=50, **search_kwargs):
    """
    Fetches submissions, parses each submission into a dictionary record, and calls the stream_comments
    function on each submission. Submission data and comment data are streamed for efficient memory 
    footprint when handling larger datasets. 
    
    Input:
        - String of Subreddit name
        - String of search query
        - Integer for limit of submissions yielded by PRAW subreddit search
    Output:
        - Tuple of submission data dict and comment data dict
    """
    assert isinstance(subreddit_name, str), 'Subreddit name must be a string.'
    assert isinstance(query, str), 'Search query must be a string.'
    assert isinstance(limit, int) and limit > 0, 'Limit must be a positive non-zero integer.'
    
    SUB = REDDIT.subreddit(subreddit_name)
    
    # Fetch submissions, and for every submission, fetch the comments
    for submission in fetch_submissions(**search_kwargs, subreddit=SUB, query=query, limit=limit):
        # Stream comment data from current submission ("submission", Dict[str, Any])
        yield from stream_comments(submission)
        
        # Stream submission data when slot available in current window
        yield "submission", {
            'submission_id':submission.id,
            'title':submission.title,
            'selftext':submission.selftext,
            'score':submission.score,
            'upvote_ratio':submission.upvote_ratio,
            'timestamp':int(submission.created_utc),
            'subreddit':submission.subreddit_name_prefixed,
            'num_comments':submission.num_comments
            }

In [200]:
def stream_aggregate_results(subreddits:List[str], queries:List[str],**search_kwargs):
    """
    Wrapper for streaming functions. Takes a list of subreddits and queries, then calls the 
    stream_search_results  function for each combination of subreddit and query. 
    
    Input:
        - List of subreddit name strings
        - List of search query strings
        - Int of maximum requests per minute, also determines upper bound of search result limit
        - Int of minimum requests, which is the floor of search result limit
        - Float of seconds denoting the time period for counting the API call limits
        - List of float values of seconds to randomly add to interval delay 
    Output:
        - Tuple of aggregated submissions dict and comments dict
    """
    assert isinstance(subreddits, list), "Argument 'subreddits' expects a list of subreddit names."
    assert isinstance(queries, list), "Argument 'queries' expects a list of search queries names."
    
    # Parse submission and comment data with jittered API calls
    for subreddit, query in product(subreddits, queries):
        # Stream submission and comment records (str(record_type), Dict[str(col_name), Any])
        yield from stream_submissions_and_comments(**search_kwargs, subreddit_name=subreddit, query=query)

#### Testing the streaming functions

In [201]:
%%time
# Test the streamer
sample_stream = tuple(
    stream_aggregate_results(
        ['cars','CarsAustralia'],
        ['toyota corolla first car','mazda 3 issues'],
        limit=10
        )
    )

CPU times: user 769 ms, sys: 84.4 ms, total: 853 ms
Wall time: 56.8 s


In [202]:
# Print total requests made so far
rate_limiter.print_total_requests()

'44 total requests as of 2025-06-27 20:33:08.'

In [203]:
# Segregate data then build dataframes
comment_data = tuple(data for record, data in sample_stream if record == 'comment')
submission_data = tuple(data for record, data in sample_stream if record == 'submission')
comment_df = pd.DataFrame(comment_data)
submission_df = pd.DataFrame(submission_data)

In [204]:
print(comment_df.shape)
comment_df.head()

(1688, 6)


Unnamed: 0,comment_id,body,score,timestamp,subreddit,parent_submission_id
0,ioe046j,The GR is going to help Toyota sell the regula...,167,1663158970,r/cars,xdyy00
1,iodt8kt,Lots of talk about it being 'safe' and 'stable...,93,1663155458,r/cars,xdyy00
2,iodw9bg,Will be interesting to see if this car or any ...,65,1663157124,r/cars,xdyy00
3,iodun0d,"Now I’m ready for the Civic Type R, WRX, Elant...",99,1663156249,r/cars,xdyy00
4,ioenkne,"A $50k crazy Corolla is cool and all, but can'...",47,1663168836,r/cars,xdyy00


In [205]:
print(submission_df.shape)
submission_df.head()

(40, 8)


Unnamed: 0,submission_id,title,selftext,score,upvote_ratio,timestamp,subreddit,num_comments
0,xdyy00,"Toyota GR Corolla | First Drive, Leveling Expe...",,436,0.93,1663153268,r/cars,251
1,51ej3q,You are sent back in time to when Henry Ford i...,"Id send him a 1999 Toyota Levin with 400,000km...",539,0.87,1473157092,r/cars,545
2,qpinpq,Toyota has some huge potential with the GR Cor...,"For the first time in my life, I think I will ...",385,0.89,1636393029,r/cars,244
3,15zrb24,Is it just me or is the Toyota corolla the mos...,So I today I had the (dis)pleasure of renting ...,0,0.31,1692851066,r/cars,183
4,x64gh1,2023 Toyota GR Corolla - Circuit Edition vs. M...,,250,0.92,1662344567,r/cars,112


#### Consolidating the streaming functions into a DataStreamer Class

In [212]:
class DataStreamer:
    def __init__(self, reddit:praw.Reddit):
        self.REDDIT = reddit
        self.rate_limiter = RateLimiter(REDDIT)
    
    @backoff_on_rate_limit()
    def _fetch_submissions(self, subreddit:praw.models.Subreddit, query:str, **kwargs):
        """Modify the subreddit search from PRAw to ensure adherence to safe request limits."""
        # Evaluate if current request can be accommodated with remaining limits
        self.rate_limiter.evaluate()
        return subreddit.search(**kwargs, query=query)

    @backoff_on_rate_limit()
    def _fetch_comments(self, submission:praw.models.Submission, limit:int=0):
        """Modify the comment fetch from PRAW to ensure adherence to safe request limits."""
        # Evaluate if current request can be accommodated with remaining limits
        self.rate_limiter.evaluate()
        # Replace 'more comments' with specified limit (default = 0 or retain top-level comments only)
        submission.comments.replace_more(limit=limit)
        for comment in submission.comments:
            yield comment
    
    def _stream_comments(self, submission:praw.models.Submission, limit:int=0):
        """
        Fetches comments from a Submission objects then parses each comment into a dictionary record.
        Each entry is streamed for efficient memory footprint when handling larger CommentForests.
        """
        # Update comments dict with info dict 
        for comment in self._fetch_comments(submission, limit=limit):            
            # Stream comment data when slot available in current window
            yield "comment", {
                'comment_id':comment.id,
                'body':comment.body,
                'score':comment.score,
                'timestamp':int(comment.created_utc),
                'subreddit':comment.subreddit_name_prefixed,
                'parent_submission_id':submission.id
                }
    
    def stream_search_results(self, subreddit_name:str, query:str, limit:int=50, **search_kwargs):
        """
        Fetches submissions then fetches the comments for each submission. Data is then repackaged 
        into a dictionary and streamed as (str(record type), Dict[str(column name), Any(data)]).
        """
        subreddit = self.REDDIT.subreddit(subreddit_name)
        # Fetch submissions, and for every submission, fetch the comments
        for submission in self._fetch_submissions(**search_kwargs, subreddit=subreddit, query=query, limit=limit):
            # Stream comment data from current submission ("submission", Dict[str, Any])
            yield from self._stream_comments(submission)
            # Stream submission data when slot available in current window
            yield "submission", {
                'submission_id':submission.id,
                'title':submission.title,
                'selftext':submission.selftext,
                'score':submission.score,
                'upvote_ratio':submission.upvote_ratio,
                'timestamp':int(submission.created_utc),
                'subreddit':submission.subreddit_name_prefixed,
                'num_comments':submission.num_comments
                }
            
    def stream(self, subreddits:List[str], queries:List[str],**search_kwargs):
        """
        Wrapper for streaming functions. Takes a list of subreddits and queries, then calls the 
        stream_search_results method for each combination of subreddit and query. 
        """
        # Parse submission and comment data with jittered API calls
        for subreddit, query in product(subreddits, queries):
            # Stream submission and comment records (str(record_type), Dict[str(col_name), Any])
            yield from self.stream_search_results(**search_kwargs, subreddit_name=subreddit, query=query)

### Transforming the extracted data

In [None]:
from abc import ABCMeta, abstractmethod

class DataTransformer(Metaclass=ABCMeta):
    @abstractmethod
    def __call__(self, table:pa.Table):
        """Transform current batch table before writing to disk."""
        # TO BE IMPLEMENTED
        out_table = self.do_something(table)
        return out_table
    
    @abstractmethod
    @staticmethod
    def add_date_column(table):
        """Convert unix timestamp to YYYY-MM-DD format and insert as column."""
        out_table = self.do_something(table)
        return out_table
    
    @abstractmethod
    @staticmethod
    def filter_comments(table):
        pass
    
    @abstractmethod
    @staticmethod
    def filter_submissions(table):
        pass

## Storing the scraped data

#### Data Storage

Data will be stored as Pyarrow tables since Parquet files have higher compression rates resulting in smaller memory footprint, which is beneficial for larger datasets.

__Data Storage Logic:__
1. Store the file paths for submission data and comment data Parquet files
2. Define the schema for the parquet files to preserve data type on export, marginally improve write performance, and avoid silent errors
3. Initialize the data generator with the list of search pairs
4. Stream data from the generator and store each record in a dictionary buffer. When buffer size reaches target byte size, convert to Pyarrow Table and write to dataset directory as Parquet file.

In [None]:
class ParquetDataLoader:
    def __init__(self, config:Tuple[NamedTuple], target_MB:Union[int,float]=8.0, transformer:object=None):
        self.CONFIG = config
        self._TRANSFORMER = transformer
        self.set_target_mb(target_MB)
        self._configure_loader()
    
    def _configure_loader(self):
        """Abstracts away the configuration of schemas, buffers, root paths, byte counters."""
        # Unpack schema from ParquetConfig namedtuples
        self._SCHEMAS = {ntuple.record_type : ntuple.schema for ntuple in self.CONFIG}
        # Store the dataset directory root paths per record type
        self._DATASET_PATHS = {ntuple.record_type : ntuple.dataset_path for ntuple in self.CONFIG}
        # Set-up buffers per record type
        self._buffers = {
            record_type : {
                col : [] for col in self._SCHEMAS[record_type].names
                } 
            for record_type in self._SCHEMAS
            }
        # Initialize byte counter per record type
        self._buffer_sizes = {record_type : 0 for record_type in self._buffers}
        # Batch counter for filename
        self._batch_counters = {record_type : 0 for record_type in self._buffers}
        return self
    
    def set_target_mb(self, target_MB:Union[int,float]):
        """
        Setter for target_MB. Updates target_MB and TARGET_BYTES attributes. Allows for updating
        buffer size targets post-initialization.
        """
        self.target_MB = target_MB
        self._TARGET_BYTES = int(target_MB * 2 ** 20)
        return self
    
    def load(self, data_stream:Generator):
        """
        Streams data ("record type", record_dict) from an input generator, stores the data into a 
        dictionary buffer, and writes to disk when a target byte size or when the function call has 
        finished.
        """ 
        # Stream the data, append to buffer, track buffer size, and when target buffer size
        # is met, write the record batch to disk and flush the buffer
        for record_type, record in data_stream:
            buffer = self._buffers[record_type]
                
            for col in buffer:
                buffer[col].append(record.get(col))
            
            # Update byte count with current record bytes
            record_bytes = len(json.dumps(record, separators=(",", ":")).encode("utf-8"))
            self._buffer_sizes[record_type] += record_bytes
            
            # Export parquet files to dataset directory and flush buffers and byte counters            
            if self._buffer_sizes[record_type] >= self._TARGET_BYTES:
                self._batch_counters[record_type] += 1
                self._write(record_type)
                buffer, self._buffer_sizes[record_type] = self._flush(buffer)

        # Final write for remaining data in both buffers after streaming data
        # Only write if there are remaining records to avoid null records in Parquet file
        for record_type, buffer in self._buffers.items():
            if all(container for container in buffer.values()):
                self._write(record_type, buffer)
                buffer, self._buffer_sizes[record_type] = self._flush(buffer)
                
    def _write(self, record_type:str, buffer:Dict[str,List]):
        """Convert buffer to Pyarrow container, write to Parquet, then flush buffer and byte count."""
        # File name; Ex. SUBMISSION-0001-20250626-201522
        fname = f'{record_type.upper()}-{self._batch_counters:04d}-{dt.datetime.now():%Y%m%d-%H%M%S}.parquet'
        # Convert current buffer to Pyarrow Table and write Parquet files to dataset directory
        pa_table = pa.Table.from_pydict(buffer)
        pq.write_to_dataset(
            table=pa_table, 
            root_path=self._DATASET_PATHS[record_type],
            schema=self._SCHEMAS[record_type],
            file_name=fname
            )
    
    @staticmethod
    def _flush(buffer:Dict[str,List]):
        """Returns a tuple of empty buffer and byte count, in order, for an input buffer."""
        # Reconstruct buffer and return with empty values
        # Also return 0 for assignment to byte count
        return {col : [] for col in buffer}, 0
    
    def __repr__(self):
        param_dict = {'config':self.CONFIG, 'target_MB':self.target_MB}
        return f'ParquetDataLoader({", ".join([f'{k}={v}' for k, v in param_dict.items()])})'
    
    def __get__(self):
        return {
            'CONFIG' : self.CONFIG,
            'target_MB' : self.target_MB,
            '_TARGET_BYTES' : self._TARGET_BYTES,
            '_SCHEMAS' : self._SCHEMAS,
            '_ROOT_PATHS' : self._DATASET_PATHS,
            '_buffers' : self._buffers,
            '_buffer_sizes' : self._buffer_sizes,
            '_batch_counters' : self._batch_counters
        }

#### Abstracting away the DataLoader configuration

These utility functions are meant to generate the schemas and dataset directories to generate the ParquetConfig namedtuples used for configuring the ParquetDataLoader. 

The config variables essentially define the datatypes for submission and comment data when building the PyArrrow tables and exporting to Parquet, and also define the dataset directory where these files will be exported to.

In [214]:
## Initialize namedtuple for Parquet Config for ease of access and immutability
ParquetConfig = namedtuple('ParquetConfig',['record_type','dataset_path','schema'])

def load_schema(schema_path:str, record_type:str) -> pa.Schema:
    """
    Returns a pyarrow schema parsed from a JSON file containing the schema.
    Input:
        - Path to JSON schema. Parent object must be either 'submission' or 'comment'
        and child object must be a dictionary containing column name keys and pyarrow datatype values
        Ex. 'submission' : {'submission_id' : 'pa.string()'}
    Output:
        - PyArrow Schema object
    """
    with open(schema_path, 'r') as f:
        schemas = json.load(f)
        schemas = {key.lower() : value for key, value in schemas.items()}
        return pa.schema(
            [(col, eval(datatype)) for col, datatype in schemas[record_type].items()]
            )

def get_submission_schema(schema_path:Optional[str]=None) -> pa.Schema:
    """
    Returns a boilerplate pyarrow schema for submission data. An optional schema path
    can be provided to return a predefined schema.
    Input:
        - [Optional] path to JSON schema. Parent object must be either 'submission' or 'comment'
        and child object must be a dictionary containing column name keys and pyarrow datatype values
        Ex. 'submission' : {'submission_id' : 'pa.string()'}
    Output:
        - PyArrow Schema object
    """
    if schema_path:
        return load_schema(schema_path, 'submission')
    return pa.schema([
        ("submission_id", pa.string()),
        ("title", pa.string()),
        ("selftext", pa.string()),
        ("score", pa.int64()),
        ("upvote_ratio", pa.float64()),
        ("timestamp", pa.timestamp("s")),
        ("date", pa.date32()),
        ("subreddit", pa.string()),
        ("num_comments", pa.int64()),
    ])
    
def get_comment_schema(schema_path:Optional[str]=None) -> pa.Schema:
    """
    Returns a boilerplate pyarrow schema for submission data. An optional schema path
    can be provided to return a predefined schema.
    Input:
        - [Optional] path to JSON schema. Parent object must be either 'submission' or 'comment'
        and child object must be a dictionary containing column name keys and pyarrow datatype values
        Ex. 'submission' : {'submission_id' : 'pa.string()'}
    Output:
        - PyArrow Schema object
    """
    if schema_path:
        return load_schema(schema_path, 'comment')
    return pa.schema([
        ("comment_id", pa.string()),
        ("body", pa.string()),
        ("score", pa.int64()),
        ("timestamp", pa.timestamp("s")),
        ("date", pa.date32()),
        ("subreddit", pa.string()),
        ("parent_submission_id", pa.string()),
    ])

def get_parquet_configs(
    project_root:str, 
    subdir:str="data/processed", 
    dataset_dirs:Union[Tuple[str],List[str]]=('submission-dataset','comment-dataset'),
    **schema_kwargs
    ):
    """
    Utility function to generate tuple of ParquetConfig namedtuples
    Input:
        - project root path
        - data subdirectory path (default = data/processed)
        - dataset directory names in order: 1) submission dataset, 2) comment dataset 
        (default = ('submission-dataset','comment-dataset'))
    Output:
        - tuple of namedtuples (sub_cfg, com_cfg) containing record_type, dataset_path, and
        schema attribtues
    """
    # Define the dataset directory paths for submission and content data
    sub_path, com_path = tuple(
        os.path.join(project_root, subdir, dataset_dir) 
        for dataset_dir in dataset_dirs
    )
    
    # Get the submission and content data Arrow & Parquet schemas
    sub_schema = get_submission_schema(**schema_kwargs)
    com_schema = get_comment_schema(**schema_kwargs)

    # Build the ParquetConfig files and return as tuple
    return tuple(
        ParquetConfig(record_type, path, schema) 
        for record_type, path, schema 
        in zip(
            ('submission','comment'),
            (sub_path, com_path),
            (sub_schema, com_schema)
            )
        )

## Building the ETL Pipeline

In [None]:
# Data Streaming Prerequisites
## Parse text files containing search queries and subreddit names
search_queries, subreddits = get_search_pairs('..')

# Only get a subset of search queries
sample_queries = search_queries[0:1]
sample_subreddits = subreddits[0:1]

# Main ETL Pipeline
PARQUET_CONFIG = get_parquet_configs('..', schema_path='../data/raw/schemas.json')

## Initialize the streamer and loader classes, transformer will be initialized by the loader.
streamer = DataStreamer(REDDIT)
# transformer = DataTransformer()
# loader = ParquetDataLoader(config=PARQUET_CONFIG, transformer=transformer)

## Streamer will stream submission and search data, loader will store data to buffer, transformer
## will preprocess the batched data, and then loader will finally write to disk.
stream = streamer.stream(subreddits=sample_subreddits, queries=sample_queries, limit=50)
# loader.load(stream)

In [227]:
%%time
etl_sample_stream = tuple(stream)

CPU times: user 781 ms, sys: 85.7 ms, total: 867 ms
Wall time: 1min 1s


In [232]:
streamer.rate_limiter.REDDIT.auth.limits

{'remaining': 948.0, 'reset_timestamp': 1751021999.491959, 'used': 52}

In [228]:
streamer.rate_limiter.print_total_requests()

'51 total requests as of 2025-06-27 20:57:59.'

In [230]:
len(etl_sample_stream)

1951

In [None]:
%%time
# Parse sample parquet files
submissions_df = pd.read_parquet('../data/submission_data_0.parquet')
comments_df = pd.read_parquet('../data/comment_data_0.parquet')

CPU times: user 21.4 ms, sys: 6.79 ms, total: 28.2 ms
Wall time: 25.5 ms


In [None]:
submissions_df.head()

Unnamed: 0,title,selftext,score,upvote_ratio,timestamp,subreddit,num_comments
1d0xqjx,"Most reliable used car, under $25k, less than ...","I’m thinking it’s likely a Toyota or Honda, bu...",29,0.94,1716716000.0,r/CarsAustralia,120
1jx4l2a,Used SUV under $20k (need a reliable car ASAP),_Reposting this because I didn’t get any comme...,2,1.0,1744417000.0,r/CarsAustralia,20
18fr75g,What would be the cheap-to-run-and-maintain ca...,"Hello, I am looking for a car with good fuel e...",29,0.63,1702289000.0,r/CarsAustralia,74
1gjb97c,"Just moved to Australia, looking for a used sm...","We have two small kids, but have access to a 4...",1,0.57,1730717000.0,r/CarsAustralia,9
1dnb05z,Reliable used car for $10-15k,I’m in the market for a used car with a budget...,5,1.0,1719228000.0,r/CarsAustralia,11


In [None]:
comments_df.head()

l5q3ib3    Mazda 3, Corolla, i30 would be my pick. I had ...
l5q6wxs                             Almost anything Japanese
l5q37wv                                        Camry/corolla
l5q74n2    Toyota Camry - boring as hell but super reliab...
l5q6dmd                                    Honda accord euro
Name: body, dtype: object

## Exploratory Data Analysis

## Test