In [2]:
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message, models, CAR

In [3]:
client = FirehoseSubscribeReposClient()

In [7]:
def on_message_handler(message):
    commit = parse_subscribe_repos_message(message)
    if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
        return
    
    if not commit.blocks:
        return
    
    car = CAR.from_bytes(commit.blocks)
    for op in commit.ops:
        if op.action in ["create"] and op.cid:
            data = car.blocks.get(op.cid)

            if data['$type'] == 'app.bsky.feed.post':
                print(data)
                text = data['text']

                if 'coffee' in text:
                    print(text)


client.start(on_message_handler)

{'text': '', '$type': 'app.bsky.feed.post', 'embed': {'$type': 'app.bsky.embed.images', 'images': [{'alt': 'A man in a crowd is looking judgmentally at the person clapping next to him.', 'image': {'ref': b'\x01U\x12 \xd7=\xa5\xfe\xcf\x14cq\x80\xe3\xe8\xda\xd9\x17\x1c\xbcw\x10\xda"\x7f\x87\xbe)V\xa1\x98`\xfa\x901\xf7', 'size': 365938, '$type': 'blob', 'mimeType': 'image/jpeg'}, 'aspectRatio': {'width': 1080, 'height': 645}}]}, 'langs': ['en'], 'reply': {'root': {'cid': 'bafyreiesfqaz46pcgrxnztfk3pz6x3i7ktxaasb6oynikdcbdkhdtmlqdq', 'uri': 'at://did:plc:gzporxpvxkz4l7xfvk7l5prj/app.bsky.feed.post/3lfzlp5uuvs2p'}, 'parent': {'cid': 'bafyreiesfqaz46pcgrxnztfk3pz6x3i7ktxaasb6oynikdcbdkhdtmlqdq', 'uri': 'at://did:plc:gzporxpvxkz4l7xfvk7l5prj/app.bsky.feed.post/3lfzlp5uuvs2p'}}, 'createdAt': '2025-01-18T16:19:43.005Z'}
{'text': 'Aaaaa thank you angel 🥺💕', '$type': 'app.bsky.feed.post', 'langs': ['en'], 'reply': {'root': {'cid': 'bafyreih622qe5kcs7txx5zg4jele6nemgocya43vfx7jq724vsn3bcfqyu', 'ur

KeyboardInterrupt: 

In [1]:
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message, models, CAR
import time
from threading import Lock

class TokenBucket:
    def __init__(self, tokens_per_second):
        self.tokens_per_second = tokens_per_second
        self.tokens = tokens_per_second
        self.last_update = time.time()
        self.lock = Lock()
    
    def get_token(self):
        with self.lock:
            now = time.time()
            time_passed = now - self.last_update
            self.tokens = min(self.tokens_per_second, 
                            self.tokens + time_passed * self.tokens_per_second)
            self.last_update = now
            
            if self.tokens >= 1:
                self.tokens -= 1
                return True
            return False

# Create rate limiter that allows 5 messages per second
rate_limiter = TokenBucket(5)

client = FirehoseSubscribeReposClient()

def on_message_handler(message):
    # Wait until we have a token available
    while not rate_limiter.get_token():
        time.sleep(0.1)
        
    commit = parse_subscribe_repos_message(message)
    if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
        return
    
    if not commit.blocks:
        return
    
    car = CAR.from_bytes(commit.blocks)
    for op in commit.ops:
        if op.action in ["create"] and op.cid:
            data = car.blocks.get(op.cid)

            if data['$type'] == 'app.bsky.feed.post':
                print(data)
                text = data['text']

                if 'coffee' in text:
                    print(text)

client.start(on_message_handler)

{'text': 'Please use alt text 🙏 we do that here...', '$type': 'app.bsky.feed.post', 'langs': ['en'], 'reply': {'root': {'cid': 'bafyreia5ldqc6j3fxhsb2vylw63ng2hgvvil7vdellgyhlchnmvu7hlafu', 'uri': 'at://did:plc:pfna7vcb3qt73bkmbv7op4xu/app.bsky.feed.post/3lfzoum6it22i'}, 'parent': {'cid': 'bafyreia5ldqc6j3fxhsb2vylw63ng2hgvvil7vdellgyhlchnmvu7hlafu', 'uri': 'at://did:plc:pfna7vcb3qt73bkmbv7op4xu/app.bsky.feed.post/3lfzoum6it22i'}}, 'createdAt': '2025-01-18T16:55:42.948Z'}
{'text': '“you hate dictatorship? so you’re saying you hate FDR??” —an interaction you probably had, more than once even, in your sophomore year of college', '$type': 'app.bsky.feed.post', 'langs': ['en'], 'createdAt': '2025-01-18T16:55:41.403Z'}
{'text': 'i agree, it gets a bit muddled at the end', '$type': 'app.bsky.feed.post', 'langs': ['en'], 'reply': {'root': {'cid': 'bafyreihsz3tha672eyhnsllmewrplxued7ktgtapkwsiqblwvoktp6dg54', 'uri': 'at://did:plc:nvfposmpmhegtyvhbs75s3pw/app.bsky.feed.post/3lfzqpipjks2k'}, 'pa

KeyboardInterrupt: 

In [4]:
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message, models, CAR

client = FirehoseSubscribeReposClient()

# Flag to indicate if we have received a post
post_received = False

def on_message_handler(message):
    global post_received
    if post_received:
        return  # Stop processing if we already received a post

    commit = parse_subscribe_repos_message(message)
    if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
        return
    
    if not commit.blocks:
        return
    
    car = CAR.from_bytes(commit.blocks)
    for op in commit.ops:
        if op.action in ["create"] and op.cid:
            data = car.blocks.get(op.cid)

            if data['$type'] == 'app.bsky.feed.post':
                print(data)  # Print the post data
                post_received = True  # Set the flag to stop further processing
                client.stop()  # Stop the client
                return

client.start(on_message_handler)

{'text': 'Hey, I can help out if needed. Im good with logos and Luna is good at Text Logos', '$type': 'app.bsky.feed.post', 'langs': ['en'], 'reply': {'root': {'cid': 'bafyreiarp75zgf4k7sxt7lukdpqxotsddwa2hudfnbf4metti77t72njuy', 'uri': 'at://did:plc:dax7uipbx5pgkq2zdrd5lkae/app.bsky.feed.post/3lfzhcpcwak2b'}, 'parent': {'cid': 'bafyreiarp75zgf4k7sxt7lukdpqxotsddwa2hudfnbf4metti77t72njuy', 'uri': 'at://did:plc:dax7uipbx5pgkq2zdrd5lkae/app.bsky.feed.post/3lfzhcpcwak2b'}}, 'createdAt': '2025-01-18T17:31:57.173Z'}


In [6]:
!curl http://localhost:8000/latest-post

{"post":{"text":"DON'T DRINK PERRIER bottled water\nBottled by Nestlé \n\nContaminated by chemicals antibacterial,\n\nThey knew & reported a fraudulent number/ over 3 trillion € of contaminated bottles sold","$type":"app.bsky.feed.post","langs":["en"],"createdAt":"2025-01-18T17:52:05.085Z"},"sentiment":"negative","insights":"Nestle's Perrier bottled water has been reported to be contaminated with chemicals, potentially affecting consumer trust and sales, and may lead to a decline in the company's stock price"}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100   518  100   518    0     0   2519      0 --:--:-- --:--:-- --:--:--  2526


In [19]:
!curl http://localhost:8000/latest-post

{"post":{"text":"Overall billions to our economy while bs artist contributed 0","$type":"app.bsky.feed.post","langs":["en"],"reply":{"root":{"cid":"bafyreig6fdkzuja2uo3bxkl7z37arav7fcxl4hcs2wefgarlpye55fdr6m","uri":"at://did:plc:uqnyrpls22pqbicmdkwnlvli/app.bsky.feed.post/3lfx7qdj6ms2x"},"parent":{"cid":"bafyreig6fdkzuja2uo3bxkl7z37arav7fcxl4hcs2wefgarlpye55fdr6m","uri":"at://did:plc:uqnyrpls22pqbicmdkwnlvli/app.bsky.feed.post/3lfx7qdj6ms2x"}},"createdAt":"2025-01-18T18:08:56.169Z"},"sentiment":"negative","insights":"The post expresses frustration towards an individual or group perceived as not contributing to the economy, suggesting dissatisfaction with current economic conditions or policies."}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100   705  100   705    0     0   3265      0 --:--:-- --:--:-- --:--:--  3263


In [22]:
import os
import json
import requests
from pathlib import Path

def fetch_and_append_json(api_url: str, json_file_path: str):
    """
    Fetches JSON data from the given API URL and appends it to the specified JSON file.
    
    Parameters:
    - api_url (str): The API endpoint to fetch the JSON data from.
    - json_file_path (str): The path to the JSON file where data will be appended.
    """
    
    try:
        # Step 1: Fetch JSON from API endpoint
        response = requests.get(api_url)
        
        if response.status_code == 200:
            try:
                data = response.json()
                print(f"Successfully fetched data from {api_url}")
            except json.JSONDecodeError:
                print("Error: The response is not valid JSON.")
                return
        elif response.status_code == 404:
            print("No relevant posts found yet (404).")
            return
        else:
            print(f"Error: Received unexpected status code {response.status_code}")
            print(f"Response: {response.text}")
            return
        
        # Step 2: Ensure the JSON file exists
        json_file = Path(json_file_path)
        if not json_file.exists():
            print(f"{json_file_path} does not exist. Creating a new file.")
            json_file.write_text("[]")  # Initialize with an empty list
        
        # Step 3: Load existing data and append the new data
        try:
            with json_file.open('r', encoding='utf-8') as file:
                try:
                    existing_data = json.load(file)
                    if not isinstance(existing_data, list):
                        print(f"Warning: {json_file_path} does not contain a list. Reinitializing as a list.")
                        existing_data = []
                except json.JSONDecodeError:
                    print(f"Warning: {json_file_path} is empty or not valid JSON. Initializing as a list.")
                    existing_data = []
        except Exception as e:
            print(f"Error reading {json_file_path}: {e}")
            return
        
        # Append the new data
        existing_data.append(data)
        
        # Step 4: Write back to the JSON file
        try:
            with json_file.open('w', encoding='utf-8') as file:
                json.dump(existing_data, file, indent=4)
            print(f"Appended new data to {json_file_path}")
        except Exception as e:
            print(f"Error writing to {json_file_path}: {e}")
    
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from {api_url}: {e}")

if __name__ == "__main__":
    # Define the API endpoint and the JSON file path
    API_URL = "http://localhost:8000/latest-post"
    JSON_FILE_PATH = "response.json"
    
    # Call the function to fetch and append JSON
    for i in range(50):
        fetch_and_append_json(API_URL, JSON_FILE_PATH)

Successfully fetched data from http://localhost:8000/latest-post
Appended new data to response.json
Successfully fetched data from http://localhost:8000/latest-post
Appended new data to response.json
Successfully fetched data from http://localhost:8000/latest-post
Appended new data to response.json
Successfully fetched data from http://localhost:8000/latest-post
Appended new data to response.json
Successfully fetched data from http://localhost:8000/latest-post
Appended new data to response.json
Successfully fetched data from http://localhost:8000/latest-post
Appended new data to response.json
Successfully fetched data from http://localhost:8000/latest-post
Appended new data to response.json
Successfully fetched data from http://localhost:8000/latest-post
Appended new data to response.json
Successfully fetched data from http://localhost:8000/latest-post
Appended new data to response.json
Successfully fetched data from http://localhost:8000/latest-post
Appended new data to response.json
