In [None]:
import pandas as pd
import re
from urllib.parse import urlparse

# Scraped csv file from python script
PATH = "scraped_feeds.csv"

df_urls = pd.read_csv(PATH)
print(df_urls.head())
print(df_urls.columns)

urls = df_urls["FeedURL"].dropna().astype(str).unique().tolist()

print("URLs loaded:", len(urls))
print("Sample:", urls[:5])

In [None]:
import requests
import re
from urllib.parse import urlparse
from functools import lru_cache

BASE = "https://public.api.bsky.app/xrpc"
S = requests.Session()
S.headers.update({"User-Agent": "feed-research/0.1"})

def parse_feed_url(url: str):
    parts = urlparse(url).path.strip("/").split("/")
    if len(parts) >= 4 and parts[0] == "profile" and parts[2] == "feed":
        return parts[1], parts[3]
    return None, None

@lru_cache(maxsize=50000)
def resolve_handle_to_did(handle: str) -> str | None:
    r = S.get(f"{BASE}/com.atproto.identity.resolveHandle", params={"handle": handle}, timeout=20)
    if r.status_code != 200:
        return None
    return r.json().get("did")

def get_feed_generator(at_uri: str) -> dict | None:
    r = S.get(f"{BASE}/app.bsky.feed.getFeedGenerator", params={"feed": at_uri}, timeout=20)
    if r.status_code != 200:
        return None
    return r.json().get("view")

def url_to_feed_record(url: str):
    handle, rkey = parse_feed_url(url)
    if not handle or not rkey:
        return None

    did = resolve_handle_to_did(handle)
    if not did:
        return None

    at_uri = f"at://{did}/app.bsky.feed.generator/{rkey}"
    view = get_feed_generator(at_uri)
    if not view:
        return None

    creator = view.get("creator") or {}

    return {
        "feed_url": url,
        "feed_at_uri": view.get("uri") or at_uri,
        "feed_cid": view.get("cid"),
        "creator_did": creator.get("did") or did,
        "creator_handle": creator.get("handle") or handle,
        "creator_display_name": creator.get("displayName", ""),

        "feed_display_name": view.get("displayName", ""),
        "feed_description": view.get("description", "") or "",
        "like_count": int(view.get("likeCount") or 0),
        "indexed_at": view.get("indexedAt", ""),
    }


In [None]:
#Initial Dedupe
records = []
errors = []

seen_urls = set()
for u in urls:
    if u in seen_urls:
        continue
    seen_urls.add(u)

    rec = url_to_feed_record(u)
    if rec is None:
        errors.append({"feed_url": u, "error": "lookup_failed"})
    else:
        records.append(rec)

df = pd.DataFrame(records)
df_err = pd.DataFrame(errors)

print("Fetched:", len(df))
print("Errors:", len(df_err))


In [None]:
# Second dedupe on feed_uri id
df = df.sort_values("like_count", ascending=False).drop_duplicates("feed_at_uri").reset_index(drop=True)
print("After dedupe(feed_at_uri):", len(df))

df.head(10)

In [None]:
# Political Scroing

POLITICAL_PATTERNS = [
    r"\bpolitic(s|al)?\b", r"\bgovernment\b", r"\bpolicy\b",
    r"\belection(s)?\b", r"\bvote(s|d|ing)?\b", r"\bcampaign\b",
    r"\bcongress\b", r"\bsenate\b", r"\bhouse\b", r"\bwhite house\b",
    r"\bscotus\b", r"\bsupreme court\b", r"\bcapitol\b", r"\bwashington\b",

    r"\bdemocrat(s|ic)?\b", r"\brepublican(s)?\b", r"\bgop\b",
    r"\bmaga\b", r"\bprogressive(s)?\b", r"\bliberal(s)?\b",
    r"\bconservative(s)?\b", r"\bleft(ist|y|ies)?\b", r"\bright[- ]wing\b",

    # US anchors
    r"\busa\b", r"\bunited states\b", r"\bamerican\b", r"\bus\b",
]

NONPOLIT_HINTS = [
    r"\bart\b", r"\bmusic\b", r"\bgaming\b", r"\banime\b", r"\bsports\b",
    r"\bfootball\b", r"\bbasketball\b", r"\bpoetry\b", r"\bwriting\b",
    r"\bcooking\b", r"\bfood\b", r"\btravel\b", r"\bphotography\b",
    r"\bscience\b", r"\bmath\b", r"\bmovies?\b", r"\btv\b",
]

pol_re = [re.compile(p, re.I) for p in POLITICAL_PATTERNS]
non_re = [re.compile(p, re.I) for p in NONPOLIT_HINTS]

def score(text: str, regexes):
    return sum(1 for rx in regexes if rx.search(text))

def classify(name: str, desc: str, like_count: int):
    text = f"{name}\n{desc}".strip()
    p = score(text, pol_re)
    n = score(text, non_re)

    # 1) Strong political markers
    if p >= 1:
        return "keep", p, n

    # 2) If it's popular, keep it even if it has subtle language
    if like_count >= 25:
        return "keep", p, n

    # 3) Drop only if we have clear non-political hits and no political hits
    if p == 0 and n >= 2:
        return "drop", p, n

    return "review", p, n

labels = [classify(r.feed_display_name, r.feed_description, r.like_count) for r in df.itertuples()]
df["political_label"] = [x[0] for x in labels]
df["political_score"] = [x[1] for x in labels]
df["nonpolit_score"] = [x[2] for x in labels]

keep = df[df["political_label"] == "keep"].copy()
review = df[df["political_label"] == "review"].copy()
drop = df[df["political_label"] == "drop"].copy()

print("KEEP:", len(keep))
print("REVIEW:", len(review))
print("DROP:", len(drop))