In [None]:
# Constants

RAW_STREAM_DIR = "../data/stream-2023-07-01"

In [None]:
# === Prediction Tracking ===

# What posts do we predict that each user has seen?
predicted_seen: dict[str, list[tuple[str, str]]] = {}

# What posts do we know that each user has seen based on their interactions?
true_seen: dict[str, list[tuple[str, str]]] = {}

# === Network Information Management ===


class UserInfo(t.TypedDict):
    posts: set[str]
    followers: set[str]


user_info: dict[str, UserInfo] = {}  # Relevant information about each user


class PostInfo(t.TypedDict):
    parent_uri: str | None  # Allows thread reconstruction on-the-fly
    subject_uri: str | None


post_info: dict[str, PostInfo] = {}  # Relevant information about each root post
deleted_posts = set[str]()

# === Helper functions ===


def gather_thread(uri: str) -> list[str]:
    """Gather the URIs of all posts in a thread from a single reply to its root."""
    post = post_info[uri]  # Info about current post
    uris: list[str] = []

    while True:
        # Found root
        if post["parent_uri"] is None:
            return uris

        # Broken thread (TODO: Could also return root here?)
        if post["parent_uri"] not in post_info:
            deleted_posts.add(post["parent_uri"])
            return uris

        uris.append(post["parent_uri"])
        post = post_info[post["parent_uri"]]


def get_quoted_uri(post: Post) -> t.Optional[str]:
    if "embed" not in post or not post["embed"]:
        return None

    try:
        if post["embed"]["$type"] == "app.bsky.embed.record":
            return post["embed"]["record"]["uri"]
        elif post["embed"]["$type"] == "app.bsky.embed.recordWithMedia":
            return post["embed"]["record"]["record"]["uri"]
    except KeyError:
        return None


# === Firehose Iteration ===

END_DATE = "2023-04-01"

# Iterate through each historical record in Bluesky's firehose
for record in records(RAW_STREAM_DIR, end_date=END_DATE, log=True):
    # New profile identified
    if record["did"] not in user_info:
        user_info[record["did"]] = {"followers": set(), "posts": set()}
        predicted_seen[record["did"]] = []
        true_seen[record["did"]] = []

    match record["$type"]:
        case "app.bsky.feed.post":
            # Keep track of all of a user's posts
            user_info[record["did"]]["posts"].add(record["uri"])

            # A user's post will be served to all of their followers as of the time of that post
            for follower in user_info[record["did"]]["followers"]:
                predicted_seen[follower].append((record["uri"], "chronological-post"))

            # Keep track of all posts
            post_info[record["uri"]] = {
                "parent_uri": None,
                "subject_uri": None,
            }

            # If a user replied to a post, we know that user saw all parent posts in that thread
            if "reply" in record and record["reply"]:
                root_uri = record["reply"]["root"]["uri"]
                parent_uri = record["reply"]["parent"]["uri"]

                # If root of thread not already tracked, mark as deleted
                if root_uri not in post_info:
                    deleted_posts.add(root_uri)

                # Update post info with reply information
                post_info[record["uri"]]["parent_uri"] = parent_uri

                # Gather all parent URIs in the thread, including the root
                parent_uris = gather_thread(record["uri"])

                # A user will see all previous replies in their thread chain
                predicted_seen[record["did"]].extend(
                    [(uri, "thread-post") for uri in parent_uris]
                )
                true_seen[record["did"]].extend(
                    [(uri, "thread-post") for uri in parent_uris]
                )

                # All creators of parent posts will see this reply post
                for uri in parent_uris:
                    did = did_from_uri(uri)
                    predicted_seen[did].append((record["uri"], "replied-to"))
                    true_seen[did].append((record["uri"], "replied-to"))

            # TODO: The other people within that thread also probably saw that reply/quote

            # If a user quoted a post, we know they saw the subject post
            quoted_uri = get_quoted_uri(record)
            if quoted_uri:
                predicted_seen[record["did"]].append((quoted_uri, "quote-post"))

                # Creator of the quoted post will see this quote post
                quoted_did = did_from_uri(quoted_uri)
                predicted_seen[quoted_did].append((record["uri"], "quoted-to"))

        case "app.bsky.graph.follow":
            if record["subject"] not in user_info:  # Check if previously unseen user
                user_info[record["subject"]] = {"followers": set(), "posts": set()}
                predicted_seen[record["subject"]] = []
                true_seen[record["subject"]] = []

            # Keep track of all of a user's followers
            user_info[record["subject"]]["followers"].add(record["did"])

            # When a user follows another user, they see all of that user's previous posts
            predicted_seen[record["did"]].extend(
                [
                    (uid, "backfill-post")
                    for uid in user_info[record["subject"]]["posts"]
                ]
            )

        case "app.bsky.feed.like":
            subject_uri = record["subject"]["uri"]

            # If a user liked a post, we know they saw it
            true_seen[record["did"]].append((subject_uri, "like"))

            # Log if that post no longer exists
            if subject_uri not in post_info:
                deleted_posts.add(subject_uri)

        case "app.bsky.feed.repost":
            subject_uri = record["subject"]["uri"]

            # If a user reposted a post, we know they saw it
            true_seen[record["did"]].append((record["subject"]["uri"], "repost"))

            # Log if that post no longer exists
            if subject_uri not in post_info:
                deleted_posts.add(subject_uri)

            # A user's repost will be served to all of their followers at the time of that post
            for follower in user_info[record["did"]]["followers"]:
                predicted_seen[follower].append((record["subject"]["uri"], "repost"))

        case _:
            continue