In [None]:
from collections import deque
from pathlib import Path
from time import sleep
from typing import Iterable

import polars as pl
from httpx import HTTPError, HTTPTransport
from swiftshadow.classes import ProxyInterface
from tenacity import (
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential,
)

from scrp.client import RedditScraper
from scrp.model import ChildT1, DataT1, RedditChild, RedditListing

In [None]:
PROXY_MANAGER = ProxyInterface(protocol="http", autoRotate=True, autoUpdate=False)
await PROXY_MANAGER.async_update()


def get_scraper():
    proxy = PROXY_MANAGER.get()
    print(f"Using new proxy '{proxy.as_string()}'")

    return RedditScraper(
        mounts={
            f"{proxy.protocol}://": HTTPTransport(
                proxy=proxy.as_string(),
            ),
        }
    )

In [None]:
def unnest(comments: RedditListing) -> list[DataT1]:
    all_comments: list[DataT1] = []
    queue = deque(comments.data.children)

    while queue:
        comment = queue.popleft()
        if isinstance(comment, ChildT1):
            if replies := comment.data.replies:
                queue.extend(replies.data.children)
            all_comments.append(comment.data)

    return all_comments


def on_fail(attempt: int, reason: str) -> RedditScraper:
    if attempt > 5:
        raise RuntimeError(f"Failed to scrape comments; {reason}")
    print(f"Getting new scraper; {reason}")
    return get_scraper()


@retry(
    stop=stop_after_attempt(2),
    wait=wait_exponential(multiplier=4),
    retry=retry_if_exception_type(HTTPError),
    before_sleep=lambda _: print("Failed to scrape comments; retrying..."),
    reraise=True,
)
def try_scrape_comments(
    scraper: RedditScraper, permalink: str
) -> list[RedditListing] | None:
    try:
        return scraper.comments(permalink, limit=100, sort="controversial")
    except HTTPError:
        return None


def scrape(permalinks: Iterable[str]) -> pl.DataFrame:
    permalinks = iter(permalinks)
    permalink = next(permalinks, None)

    scraper = get_scraper()

    df, retry_attempts, count = None, 0, 0

    while permalink and retry_attempts < 5:
        response = try_scrape_comments(scraper, permalink)
        if response is None:
            scraper = on_fail(retry_attempts, "API returned an HTTPError")
            retry_attempts += 1
            sleep(30 * retry_attempts)
            continue

        retry_attempts = 0
        count += 1

        comments = unnest(response[1])

        if len(comments) == 0:
            permalink = next(permalinks, None)
            continue

        if df is None:
            df = pl.DataFrame(comments).drop("replies")
        else:
            df = pl.DataFrame(comments).drop("replies").vstack(df)

        short = permalink.split("/")[4]
        print(f"Scraped {len(comments)} ({df.height}) rows for permalink '{short}' (Count: {count}).")
        permalink = next(permalinks, None)

    if df is None:
        raise RuntimeError("No data was scraped.")

    if retry_attempts >= 5:
        print("Failed to scrape all comments; exited early...")

    print(f"Finished scraping {df.height} comments for '{permalink}'.")

    return df

In [None]:
path = Path("output/combined.parquet")

if path.exists():
    df_posts = pl.read_parquet(path)
    df_posts = df_posts.unique("id")
else:
    raise ValueError("Failed to read parquet, file does not exist.")

In [None]:
permalinks = df_posts.select("permalink").to_series()
df = scrape(permalinks)
# df = df.vstack(pl.read_parquet("output/combined_comments.parquet")).unique("id") -> To extend to dataframe with new comments.

In [20]:
path = Path("output/combined_comments.parquet")

if not path.exists():
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w") as f:
        df.write_parquet(path)
else:
    print("Failed to write parquet, file already exists.")

In [21]:
df_last = pl.read_parquet("output/combined_comments.parquet")