In [None]:
import json
import os
import sqlite3
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

import requests


X_API_BASE = "https://api.x.com/2"


def _sleep_backoff(attempt: int) -> None:
    # Exponential backoff with a small cap
    time.sleep(min(60, 2 ** attempt))


def _max_post_id(posts: List[dict]) -> Optional[str]:
    # Post IDs are large; keep them as strings but compare as ints
    if not posts:
        return None
    return str(max(int(p["id"]) for p in posts if "id" in p))


@dataclass
class XTrackerConfig:
    bearer_token: str
    usernames: List[str]                 # 13 accounts (council + 12 members)
    discussion_query: str                # recent search query
    db_path: str = "x_tracker.sqlite"
    state_path: str = "x_tracker_state.json"
    user_post_fields: str = "created_at,author_id,conversation_id,public_metrics"
    tweet_fields: str = "created_at,author_id,conversation_id,public_metrics"
    max_results_timeline: int = 50       # 5..100 per docs
    max_results_search: int = 100        # 10..100 for recent search typically


class XTracker:
    def __init__(self, cfg: XTrackerConfig):
        self.cfg = cfg
        self.session = requests.Session()
        self.session.headers.update({"Authorization": f"Bearer {cfg.bearer_token}"})

        self._init_db()

    def _init_db(self) -> None:
        con = sqlite3.connect(self.cfg.db_path)
        cur = con.cursor()
        cur.execute(
            """
            CREATE TABLE IF NOT EXISTS posts (
                id TEXT PRIMARY KEY,
                source TEXT NOT NULL,            -- "timeline" or "search"
                author_id TEXT,
                author_username TEXT,
                created_at TEXT,
                text TEXT,
                raw_json TEXT
            )
            """
        )
        cur.execute(
            """
            CREATE TABLE IF NOT EXISTS users (
                username TEXT PRIMARY KEY,
                user_id TEXT NOT NULL
            )
            """
        )
        con.commit()
        con.close()

    def _load_state(self) -> dict:
        if not os.path.exists(self.cfg.state_path):
            return {"since_id_by_user_id": {}, "search_since_id": None}
        with open(self.cfg.state_path, "r", encoding="utf-8") as f:
            return json.load(f)

    def _save_state(self, state: dict) -> None:
        with open(self.cfg.state_path, "w", encoding="utf-8") as f:
            json.dump(state, f, indent=2, ensure_ascii=False)

    def _request_json(self, method: str, url: str, params: Optional[dict] = None) -> dict:
        for attempt in range(0, 6):
            resp = self.session.request(method, url, params=params, timeout=30)
            if resp.status_code == 429:
                _sleep_backoff(attempt)
                continue
            resp.raise_for_status()
            return resp.json()
        raise RuntimeError("Too many 429 rate limit responses. Try reducing frequency/max_results.")

    def get_user_ids_bulk(self) -> Dict[str, str]:
        """
        Bulk lookup: GET /2/users/by?usernames=...
        Docs: "Get Users by usernames" quickstart exists; limit up to 100 usernames.
        """
        usernames = [u.lstrip("@") for u in self.cfg.usernames]
        url = f"{X_API_BASE}/users/by"
        params = {"usernames": ",".join(usernames), "user.fields": "username"}

        data = self._request_json("GET", url, params=params)
        users = data.get("data", [])

        mapping: Dict[str, str] = {}
        for u in users:
            mapping[u["username"].lower()] = u["id"]

        # Persist mapping
        con = sqlite3.connect(self.cfg.db_path)
        cur = con.cursor()
        for username in usernames:
            user_id = mapping.get(username.lower())
            if user_id:
                cur.execute(
                    "INSERT OR REPLACE INTO users (username, user_id) VALUES (?, ?)",
                    (username.lower(), user_id),
                )
        con.commit()
        con.close()

        missing = [u for u in usernames if u.lower() not in mapping]
        if missing:
            print(f"Warning: Could not resolve user IDs for: {missing}")

        return mapping

    def _get_username_for_id(self, user_id: str) -> Optional[str]:
        con = sqlite3.connect(self.cfg.db_path)
        cur = con.cursor()
        cur.execute("SELECT username FROM users WHERE user_id = ?", (user_id,))
        row = cur.fetchone()
        con.close()
        return row[0] if row else None

    def fetch_timeline_posts(self, user_id: str, since_id: Optional[str]) -> List[dict]:
        """
        Timeline: GET /2/users/{id}/posts
        """
        url = f"{X_API_BASE}/users/{user_id}/posts"
        params = {
            "max_results": self.cfg.max_results_timeline,
            "tweet.fields": self.cfg.user_post_fields,  # docs use tweet.fields for Post object fields
        }
        if since_id:
            params["since_id"] = since_id

        data = self._request_json("GET", url, params=params)
        return data.get("data", [])

    def fetch_recent_discussion(self, since_id: Optional[str]) -> List[dict]:
        """
        Recent Search: GET /2/tweets/search/recent
        """
        url = f"{X_API_BASE}/tweets/search/recent"
        params = {
            "query": self.cfg.discussion_query,
            "max_results": self.cfg.max_results_search,
            "tweet.fields": self.cfg.tweet_fields,
        }
        if since_id:
            params["since_id"] = since_id

        data = self._request_json("GET", url, params=params)
        return data.get("data", [])

    def _upsert_posts(self, posts: List[dict], source: str, author_username: Optional[str] = None) -> int:
        if not posts:
            return 0

        con = sqlite3.connect(self.cfg.db_path)
        cur = con.cursor()
        inserted = 0

        for p in posts:
            post_id = p.get("id")
            if not post_id:
                continue
            raw_json = json.dumps(p, ensure_ascii=False)

            cur.execute(
                """
                INSERT OR IGNORE INTO posts
                (id, source, author_id, author_username, created_at, text, raw_json)
                VALUES (?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    post_id,
                    source,
                    p.get("author_id"),
                    author_username,
                    p.get("created_at"),
                    p.get("text"),
                    raw_json,
                ),
            )
            if cur.rowcount == 1:
                inserted += 1

        con.commit()
        con.close()
        return inserted

    def run_once(self) -> None:
        """
        Run one collection pass. Schedule this with cron/GitHub Actions/etc.
        """
        state = self._load_state()

        # Ensure we have user IDs cached
        user_map = self.get_user_ids_bulk()
        user_ids = [uid for uid in user_map.values() if uid]

        # 1) Timelines for the 14 accounts
        total_new_timeline = 0
        for user_id in user_ids:
            since_id = state["since_id_by_user_id"].get(user_id)
            posts = self.fetch_timeline_posts(user_id=user_id, since_id=since_id)

            username = self._get_username_for_id(user_id)
            inserted = self._upsert_posts(posts, source="timeline", author_username=username)
            total_new_timeline += inserted

            new_since = _max_post_id(posts)
            if new_since:
                state["since_id_by_user_id"][user_id] = new_since

        # 2) General discussion via Recent Search
        search_since = state.get("search_since_id")
        discussion_posts = self.fetch_recent_discussion(since_id=search_since)
        total_new_search = self._upsert_posts(discussion_posts, source="search", author_username=None)

        new_search_since = _max_post_id(discussion_posts)
        if new_search_since:
            state["search_since_id"] = new_search_since

        self._save_state(state)

        print(
            f"Done. New timeline posts saved: {total_new_timeline}. "
            f"New discussion posts saved: {total_new_search}."
        )


def build_discussion_query(usernames: List[str]) -> str:
    """
    Build a conservative query for DC Council discussion:
    - keyword phrases
    - @mentions of council + members
    - excludes retweets to reduce volume (optional)
    """
    handles = [u.lstrip("@") for u in usernames]
    mention_terms = " OR ".join([f"@{h}" for h in handles])

    # You can tune keywords to reduce volume and cost.
    keyword_terms = '("DC Council" OR "Council of the District of Columbia" OR "DC City Council" OR "Council of DC")'

    # You can add lang:en or -is:reply if needed, but be careful not to miss relevant posts.
    return f"({keyword_terms} OR ({mention_terms})) -is:retweet lang:en"


if __name__ == "__main__":
    #bearer_token = os.environ.get("X_BEARER_TOKEN", "").strip()
    bearer_token = "AAAAAAAAAAAAAAAAAAAAAKvy7AEAAAAApLfuKEe9QGv6OlhnrBw3ngk1fM8%3DfKylKRecx4aQXn42qmpP6SFqjKY5pw6IRp9CGotpZVwHmGPInb"
    if not bearer_token:
        raise RuntimeError('Set env var "X_BEARER_TOKEN" to your X API Bearer Token.')

    usernames = [
        "CouncilofDC",
        "ChmnMendelson",
        "CMAnitaBondsDC",
        "RobertWhite_DC",
        "chenderson",
        "DoniCrawford",
        "BrianneKNadeau",
        "CMBrookePinto",
        "CMFrumin",
        "Janeese4DC",
        "ZacharyforWard5",
        "charlesallen",
        "WendellforWard7"
        #,"trayonwhite" # ward 8
    ]

    discussion_query = build_discussion_query(usernames)

    cfg = XTrackerConfig(
        bearer_token=bearer_token,
        usernames=usernames,
        discussion_query=discussion_query,
        db_path="x_tracker.sqlite",
        state_path="x_tracker_state.json",
    )

    tracker = XTracker(cfg)
    tracker.run_once()


Done. New timeline posts saved: 636. New discussion posts saved: 98.


In [None]:
from google.colab import files

files.download("x_tracker.sqlite")
files.download("x_tracker_state.json")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>