In [3]:
pip install atproto -q


You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install ace_tools_open -q


You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [5]:
from atproto import CAR, models, FirehoseSubscribeReposClient, parse_subscribe_repos_message
import json
import threading

In [10]:


client = FirehoseSubscribeReposClient()

MAX_MESSAGES = 10000  # Limit messages for testing
message_count = 0  # Counter for all messages

# Lists to store CREATE commits
Posts = [] 
Likes = []
Follows = []
Reposts = []

# Lists to store Identity and Account messages
Identity = []
Account = []

def stop_firehose():
    """Stops the Firehose listener after MAX_MESSAGES messages."""
    print("Reached message limit. Stopping Firehose.")
    client.stop()

def extract_subject_info(subject):
    """Safely extracts subject CID and URI from a Main object or dictionary."""
    if hasattr(subject, "cid") and hasattr(subject, "uri"):  # Check if it's a Main object
        return str(subject.cid), subject.uri
    elif isinstance(subject, dict):  # If it's already a dictionary
        return subject.get("cid"), subject.get("uri")
    return None, None  # Fallback if structure is unexpected

def on_message_handler(message) -> None:
    """Processes Firehose messages and categorizes them."""
    global message_count

    message_count += 1  # Count all incoming messages

    try:
        commit = parse_subscribe_repos_message(message)

        if isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
            if not commit.blocks:
                print(f"Commit {commit.seq} has no blocks. Skipping.")
                return

            car = CAR.from_bytes(commit.blocks)

            for op in commit.ops:
                if op.action != "create" or not op.cid:
                    continue

                record_raw_data = car.blocks.get(op.cid)
                if not record_raw_data:
                    continue

                record = models.get_or_create(record_raw_data, strict=False)
                if not record:
                    continue

                # Extract event type from path
                event_type = op.path.split("/")[0]

                # Convert record to dictionary safely
                record_data = record.__dict__

                # Process event types
                if event_type == "app.bsky.feed.post":
                    Posts.append({
                        "repo": commit.repo,
                        "timestamp": commit.time,
                        "seq": commit.seq,
                        "text": record_data.get("text"),
                        "langs": record_data.get("langs", []),
                        "cid": str(op.cid),
                        "uri": f"at://{commit.repo}/{op.path}",
                    })
                elif event_type == "app.bsky.feed.like":
                    liked_post_cid, liked_post_uri = extract_subject_info(record_data.get("subject"))
                    Likes.append({
                        "repo": commit.repo,
                        "timestamp": commit.time,
                        "seq": commit.seq,
                        "liked_post_cid": liked_post_cid,
                        "liked_post_uri": liked_post_uri,
                    })
                elif event_type == "app.bsky.feed.repost":
                    reposted_post_cid, reposted_post_uri = extract_subject_info(record_data.get("subject"))
                    Reposts.append({
                        "repo": commit.repo,
                        "timestamp": commit.time,
                        "seq": commit.seq,
                        "reposted_post_cid": reposted_post_cid,
                        "reposted_post_uri": reposted_post_uri,
                    })
                elif event_type == "app.bsky.graph.follow":
                    followed_user = record_data.get("subject") if isinstance(record_data.get("subject"), str) else None
                    Follows.append({
                        "repo": commit.repo,
                        "timestamp": commit.time,
                        "seq": commit.seq,
                        "followed_user": followed_user,
                    })

            print(f"Processed Commit {commit.seq} from {commit.repo}")

        elif hasattr(message, "header") and hasattr(message.header, "t"):
            # Handle identity and account messages
            event_type = message.header.t.lower()

            if event_type == "#identity":
                Identity.append(message)
            elif event_type == "#account":
                Account.append(message)

            print(f"Stored {event_type} event.")

    except Exception as e:
        print(f"Error processing message: {e}")

    if message_count >= MAX_MESSAGES:
        threading.Thread(target=stop_firehose).start()

# Start Firehose stream
client.start(on_message_handler)

# Print final summary
print("\nFinal Message Summary")
print(f"Total Messages Processed: {message_count}")
print(f"Posts: {len(Posts)}")
print(f"Likes: {len(Likes)}")
print(f"Reposts: {len(Reposts)}")
print(f"Follows: {len(Follows)}")
print(f"Identity Events: {len(Identity)}")
print(f"Account Events: {len(Account)}")


Processed Commit 6718935011 from did:plc:nkb5vkykx2j3yyqrgjbhex2b
Processed Commit 6718935012 from did:plc:i5s2v4vqeasfo3l6kwhxbsf7
Processed Commit 6718935013 from did:plc:f3wz2hvwvnrjdj4dcoc5yhg5
Processed Commit 6718935014 from did:plc:ltxgexryghndj2h2olmdurya
Processed Commit 6718935015 from did:plc:dx6ziei64l4dityfdi6rlvbu
Processed Commit 6718935016 from did:plc:s77ps4tohfop3ickx35aqjlq
Processed Commit 6718935017 from did:plc:frqfpsooh6ng27hazaosgxhm
Processed Commit 6718935018 from did:plc:gnztayfapmxn3iberckrerh6
Processed Commit 6718935019 from did:plc:5hy7bdaqnyq3x6wt7o5jg3wk
Processed Commit 6718935020 from did:plc:4am7ahlxc2ekgfrvbm4vf5ow
Processed Commit 6718935021 from did:plc:vs5sxzohmb5lkjt22vtbf4vk
Processed Commit 6718935022 from did:plc:ktqaf6h64rjc3n47xprefjpy
Processed Commit 6718935023 from did:plc:adptiffoptc4uynqllsnyg6t
Processed Commit 6718935024 from did:plc:lnrikeemd2ne6jdtlnwf6erm
Processed Commit 6718935025 from did:plc:egmqwgpqdqkurfoiuumjywvz
Processed 

## Code below increments for evey processed message and funnels the message type into one OR multiple lists:

In [29]:
import threading
import json
import os
from datetime import datetime


client = FirehoseSubscribeReposClient()

MAX_MESSAGES = 100000  # Limit messages for testing
SAVE_THRESHOLD = 5000  # Number of entries before saving

message_count = 0  # Counter for all messages

# Lists to store CREATE commits
Posts = []
Likes = []
Follows = []
Reposts = []

# Lists to store Identity and Account messages
Identity = []
Account = []

def stop_firehose():
    """Stops the Firehose listener after MAX_MESSAGES messages."""
    print("Reached message limit. Stopping Firehose.")
    client.stop()

def extract_subject_info(subject):
    """Safely extracts subject CID and URI from a Main object or dictionary."""
    if hasattr(subject, "cid") and hasattr(subject, "uri"):  # Check if it's a Main object
        return str(subject.cid), subject.uri
    elif isinstance(subject, dict):  # If it's already a dictionary
        return subject.get("cid"), subject.get("uri")
    return None, None  # Fallback if structure is unexpected


def save_list_to_disk(list_name, data_list):
    """Saves a specific list to disk and clears the saved portion."""
    if len(data_list) >= SAVE_THRESHOLD:
        # Get current date (and optionally time) as a string
        timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        filename = f"{list_name.lower()}_backup_{timestamp}.json"

        with open(filename, "w", encoding="utf-8") as file:
            json.dump(data_list[:SAVE_THRESHOLD], file, ensure_ascii=False)
            file.write("\n")  # Newline separator (in case appending becomes relevant)

        print(f"Saved {SAVE_THRESHOLD} {list_name} entries to {filename}")

        # Clear saved portion from memory
        del data_list[:SAVE_THRESHOLD]

def on_message_handler(message) -> None:
    """Processes Firehose messages and categorizes them."""
    global message_count

    processed = False  # Track if at least one valid message is processed

    try:
        commit = parse_subscribe_repos_message(message)

        if isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
            if not commit.blocks:
                print(f"Commit {commit.seq} has no blocks. Skipping.")
                return

            car = CAR.from_bytes(commit.blocks)

            for op in commit.ops:
                if op.action != "create" or not op.cid:
                    continue

                record_raw_data = car.blocks.get(op.cid)
                if not record_raw_data:
                    continue

                record = models.get_or_create(record_raw_data, strict=False)
                if not record:
                    continue

                # Extract event type from path
                event_type = op.path.split("/")[0]

                # Convert record to dictionary safely
                record_data = record.__dict__

                # Process event types
                if event_type == "app.bsky.feed.post":
                    Posts.append({
                        "repo": commit.repo,
                        "timestamp": commit.time,
                        "seq": commit.seq,
                        "text": record_data.get("text"),
                        "langs": record_data.get("langs", []),
                        "cid": str(op.cid),
                        "uri": f"at://{commit.repo}/{op.path}",
                    })
                    processed = True
                if event_type == "app.bsky.feed.like":
                    liked_post_cid, liked_post_uri = extract_subject_info(record_data.get("subject"))
                    Likes.append({
                        "repo": commit.repo,
                        "timestamp": commit.time,
                        "seq": commit.seq,
                        "liked_post_cid": liked_post_cid,
                        "liked_post_uri": liked_post_uri,
                    })
                    processed = True
                if event_type == "app.bsky.feed.repost":
                    reposted_post_cid, reposted_post_uri = extract_subject_info(record_data.get("subject"))
                    Reposts.append({
                        "repo": commit.repo,
                        "timestamp": commit.time,
                        "seq": commit.seq,
                        "reposted_post_cid": reposted_post_cid,
                        "reposted_post_uri": reposted_post_uri,
                    })
                    processed = True
                if event_type == "app.bsky.graph.follow":
                    followed_user = record_data.get("subject") if isinstance(record_data.get("subject"), str) else None
                    Follows.append({
                        "repo": commit.repo,
                        "timestamp": commit.time,
                        "seq": commit.seq,
                        "followed_user": followed_user,
                    })
                    processed = True

            if processed:
                message_count += 1  # Only increment if at least one valid operation was processed

                # Save only the lists that have reached the threshold
                save_list_to_disk("Posts", Posts)
                save_list_to_disk("Likes", Likes)
                save_list_to_disk("Reposts", Reposts)
                save_list_to_disk("Follows", Follows)

            print(f"Processed Commit {commit.seq} from {commit.repo}")

        elif hasattr(message, "header") and hasattr(message.header, "t"):
            # Handle identity and account messages
            event_type = message.header.t.lower()

            if event_type == "#identity":
                Identity.append(getattr(message, "body", {}))  # Extract the actual dict content
                processed = True
            elif event_type == "#account":
                Account.append(getattr(message, "body", {}))  # Extract the actual dict content
                processed = True

            if processed:
                message_count += 1  # Increment message count for identity/account messages

                # Save Identity and Account lists only if they reach the threshold
                save_list_to_disk("Identity", Identity)
                save_list_to_disk("Account", Account)

            

            print(f"Stored {event_type}     event.")

    except Exception as e:
        print(f"Error processing message: {e}")

    if message_count >= MAX_MESSAGES:
        threading.Thread(target=stop_firehose).start()

# Start Firehose stream
client.start(on_message_handler)

# Final save when all messages are processed
save_list_to_disk("Posts", Posts)
save_list_to_disk("Likes", Likes)
save_list_to_disk("Reposts", Reposts)
save_list_to_disk("Follows", Follows)
save_list_to_disk("Identity", Identity)
save_list_to_disk("Account", Account)

# Print final summary
print("\nFinal Message Summary")
print(f"Total Messages Processed: {message_count}")
print(f"Posts: {len(Posts)}")
print(f"Likes: {len(Likes)}")
print(f"Reposts: {len(Reposts)}")
print(f"Follows: {len(Follows)}")
print(f"Identity Events: {len(Identity)}")
print(f"Account Events: {len(Account)}")


Processed Commit 6898806747 from did:plc:vjykxf2zk63tbvtao3ilfwne
Processed Commit 6898806748 from did:plc:rpzg2qtlgwfyvlzvkqldtqg2
Processed Commit 6898806749 from did:plc:3u5gch5qgeec3domlk3w5o2h
Processed Commit 6898806750 from did:plc:rfv7bwesp5mqh5jcm2z7dqof
Processed Commit 6898806751 from did:plc:2jy4qpisukkoal7tc3zz6jsv
Processed Commit 6898806752 from did:plc:xr73m4p6qeeogxtzgu6qf4n4
Processed Commit 6898806753 from did:plc:o4yhf3bt6dmxabxcy4kvd4hz
Processed Commit 6898806754 from did:plc:75ko4brlm6w7fmv4n3zsso6s
Processed Commit 6898806755 from did:plc:qdsbdw6hozaukj3qddoti5zd
Processed Commit 6898806756 from did:plc:xzpj6b3rmemedg6hcmxiw7ki
Saved 5 Likes entries to likes_backup_2025-03-23_21-03-14.json
Processed Commit 6898806757 from did:plc:n25jxikqgw33tuvjifvxhfe7
Processed Commit 6898806758 from did:plc:z664n5cc5nkgmx7q3cbotymb
Processed Commit 6898806759 from did:plc:ufcxhplgksrexkypp32sgm74
Saved 5 Follows entries to follows_backup_2025-03-23_21-03-14.json
Processed Co

KeyboardInterrupt: 

In [27]:
Account[:5]

[MessageFrame(header=MessageFrameHeader(op=1, t='#account'), body={'did': 'did:plc:l2yjpdoxl7ql6n74disemp5m', 'seq': 6894438669, 'time': '2025-03-23T18:07:13.887Z', 'active': True}),
 MessageFrame(header=MessageFrameHeader(op=1, t='#account'), body={'did': 'did:plc:wmsqywyhbeezhsfwh3neacgn', 'seq': 6894440764, 'time': '2025-03-23T18:07:17.247Z', 'active': True}),
 MessageFrame(header=MessageFrameHeader(op=1, t='#account'), body={'did': 'did:plc:3z23g7tyt5pgvgkroqgcz37c', 'seq': 6894442017, 'time': '2025-03-23T18:07:19.252Z', 'active': True}),
 MessageFrame(header=MessageFrameHeader(op=1, t='#account'), body={'did': 'did:plc:2ypcigjcncyhcno2ba5xi5s7', 'seq': 6894442891, 'time': '2025-03-23T18:07:20.571Z', 'active': True}),
 MessageFrame(header=MessageFrameHeader(op=1, t='#account'), body={'did': 'did:plc:emnqt7snpygwkvqyskqevecu', 'seq': 6894443845, 'time': '2025-03-23T18:07:22.185Z', 'active': True})]

In [22]:
Identity[:5]

[MessageFrame(header=MessageFrameHeader(op=1, t='#identity'), body={'did': 'did:plc:74o5azqknr3n7xcgdgc2zdnm', 'seq': 6802998433, 'time': '2025-03-21T16:14:42.110Z', 'handle': 'callito77.bsky.social'}),
 MessageFrame(header=MessageFrameHeader(op=1, t='#identity'), body={'did': 'did:plc:65fpi6aworvd5h4p7yfqpxgl', 'seq': 6802999924, 'time': '2025-03-21T16:14:44.451Z', 'handle': 'dirtysnapple.bsky.social'}),
 MessageFrame(header=MessageFrameHeader(op=1, t='#identity'), body={'did': 'did:plc:myvmpqkctnd62xjxbintc3bx', 'seq': 6803000530, 'time': '2025-03-21T16:14:45.447Z', 'handle': 'tracyy005.bsky.social'}),
 MessageFrame(header=MessageFrameHeader(op=1, t='#identity'), body={'did': 'did:plc:iahonh3in55r6fhayzujwf3r', 'seq': 6803003288, 'time': '2025-03-21T16:14:49.597Z', 'handle': 'pantasticnow.bsky.social'}),
 MessageFrame(header=MessageFrameHeader(op=1, t='#identity'), body={'did': 'did:plc:qcr77gvksocv4ksq4oghfl5o', 'seq': 6803003976, 'time': '2025-03-21T16:14:50.487Z', 'handle': 'aymen

In [19]:
Reposts[:5]

[{'repo': 'did:plc:phrw5zajj66xj5pdhnaayswp',
  'timestamp': '2025-03-21T15:57:11.314Z',
  'seq': 6802305117,
  'reposted_post_cid': 'bafyreiaz5iyzavypgti5rs3rytvdrgjiugh7bwi3yscdfrvidqh4r7o3fq',
  'reposted_post_uri': 'at://did:plc:ktzz5bviimev46rwf7ftw2fi/app.bsky.feed.post/3lkt7aaijek2k'},
 {'repo': 'did:plc:7p3imdezdmykdysvkycy7pdc',
  'timestamp': '2025-03-21T15:57:11.314Z',
  'seq': 6802305118,
  'reposted_post_cid': 'bafyreidxeduxzcd5tbiaewoqmevcot46vgeljjzjtdmasofjlb6jglsmku',
  'reposted_post_uri': 'at://did:plc:rjr6nfdzrfngjgc34jjyty6a/app.bsky.feed.post/3lkvdpb72kc2g'},
 {'repo': 'did:plc:43qrovmvd3b5ww6o6lkuce3v',
  'timestamp': '2025-03-21T15:57:11.397Z',
  'seq': 6802305175,
  'reposted_post_cid': 'bafyreiejja23ayqq5xrfiznc3oi4kdk6ijcbugazh75a24ysdje5qeom44',
  'reposted_post_uri': 'at://did:plc:dqygof75u7irrrkewbaxt23w/app.bsky.feed.post/3lkuubshmlk25'},
 {'repo': 'did:plc:ghpaw6sm7x2jnvcdp6mz7eo5',
  'timestamp': '2025-03-21T15:57:11.401Z',
  'seq': 6802305177,
  'repos