# Bluesky Repost Bot

This notebook provides a complete, self-contained Bluesky repost bot.

## Basic Features
- Search posts with specific tags/keywords
- Auto-repost with preset comments
- Monitor comments and auto-reply
- View real-time statistics

## 1. Install Dependencies

Run this cell to install all required packages:

In [2]:
!pip install atproto>=0.0.55 schedule>=1.2.0

zsh:1: 0.0.55 not found


## 2. Import Required Libraries

In [3]:
import os
import json
import logging
import sqlite3
import schedule
import time
from datetime import datetime
from typing import List, Optional
from atproto import Client, models
from getpass import getpass



## 3. Configure Logging

In [4]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)

logger = logging.getLogger(__name__)
print("Logging system configured")

Logging system configured


## 4. Database Class Definition

In [5]:
class Database:
    def __init__(self, db_path: str = "posts.db"):
        self.db_path = db_path
        self.init_database()

    def init_database(self):
        """Initialize database tables"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # Table for reposted posts
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS reposted_posts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                original_uri TEXT UNIQUE NOT NULL,
                original_author TEXT NOT NULL,
                repost_uri TEXT,
                reposted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)

        # Table for processed replies
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS processed_replies (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                reply_uri TEXT UNIQUE NOT NULL,
                parent_post_uri TEXT NOT NULL,
                author TEXT NOT NULL,
                content TEXT,
                replied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)

        conn.commit()
        conn.close()

    def is_post_reposted(self, post_uri: str) -> bool:
        """Check if a post has already been reposted"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT 1 FROM reposted_posts WHERE original_uri = ?", (post_uri,))
        result = cursor.fetchone()
        conn.close()
        return result is not None

    def add_reposted_post(self, original_uri: str, original_author: str, repost_uri: Optional[str] = None):
        """Record a reposted post"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        try:
            cursor.execute(
                "INSERT INTO reposted_posts (original_uri, original_author, repost_uri) VALUES (?, ?, ?)",
                (original_uri, original_author, repost_uri)
            )
            conn.commit()
        except sqlite3.IntegrityError:
            pass
        finally:
            conn.close()

    def is_reply_processed(self, reply_uri: str) -> bool:
        """Check if a reply has already been processed"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT 1 FROM processed_replies WHERE reply_uri = ?", (reply_uri,))
        result = cursor.fetchone()
        conn.close()
        return result is not None

    def add_processed_reply(self, reply_uri: str, parent_post_uri: str, author: str, content: str):
        """Record a processed reply"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        try:
            cursor.execute(
                "INSERT INTO processed_replies (reply_uri, parent_post_uri, author, content) VALUES (?, ?, ?, ?)",
                (reply_uri, parent_post_uri, author, content)
            )
            conn.commit()
        except sqlite3.IntegrityError:
            pass
        finally:
            conn.close()

    def get_recent_reposts(self, limit: int = 10) -> List[dict]:
        """Get recent reposted posts"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            "SELECT original_uri, original_author, repost_uri, reposted_at FROM reposted_posts ORDER BY reposted_at DESC LIMIT ?",
            (limit,)
        )
        results = cursor.fetchall()
        conn.close()

        return [
            {
                "original_uri": row[0],
                "original_author": row[1],
                "repost_uri": row[2],
                "reposted_at": row[3]
            }
            for row in results
        ]

print("Database class defined")

Database class defined


## 5. RepostManager Class Definition

In [6]:
class RepostManager:
    def __init__(self, client: Client, db: Database, config: dict):
        self.client = client
        self.db = db
        self.config = config

    def search_posts(self, query: str, limit: int = 25) -> List[dict]:
        """Search for posts matching the query"""
        try:
            response = self.client.app.bsky.feed.search_posts(params={'q': query, 'limit': limit})
            posts = []
            if hasattr(response, 'posts'):
                for post in response.posts:
                    posts.append({
                        'uri': post.uri,
                        'cid': post.cid,
                        'author': post.author.handle,
                        'text': post.record.text if hasattr(post.record, 'text') else '',
                        'created_at': post.record.created_at if hasattr(post.record, 'created_at') else None
                    })
            return posts
        except Exception as e:
            logger.error(f"Error searching posts: {e}")
            return []

    def should_repost(self, post: dict) -> bool:
        """Determine if a post should be reposted"""
        if self.db.is_post_reposted(post['uri']):
            return False

        text = post.get('text', '').lower()
        tags = self.config.get('search', {}).get('tags', [])
        for tag in tags:
            if tag.lower() in text:
                return True

        keywords = self.config.get('search', {}).get('keywords', [])
        for keyword in keywords:
            if keyword.lower() in text:
                return True

        return False

    def repost_with_comment(self, post: dict) -> Optional[str]:
        """Repost a post and add a preset comment"""
        try:
            repost_ref = models.create_strong_ref(
                models.ComAtprotoRepoStrongRef.Main(uri=post['uri'], cid=post['cid'])
            )

            repost_response = self.client.app.bsky.feed.repost.create(
                self.client.me.did,
                models.AppBskyFeedRepost.Record(
                    subject=repost_ref,
                    created_at=self.client.get_current_time_iso()
                )
            )

            logger.info(f"Reposted: {post['uri']}")

            preset_comment = self.config.get('repost', {}).get('preset_comment', '')
            if preset_comment:
                self.add_comment(post, preset_comment)

            repost_uri = f"at://{self.client.me.did}/app.bsky.feed.repost/{repost_response.cid}"
            self.db.add_reposted_post(post['uri'], post['author'], repost_uri)

            return repost_uri

        except Exception as e:
            logger.error(f"Error reposting: {e}")
            return None

    def add_comment(self, post: dict, comment_text: str):
        """Add a comment to a post"""
        try:
            reply_ref = models.AppBskyFeedPost.ReplyRef(
                parent=models.create_strong_ref(
                    models.ComAtprotoRepoStrongRef.Main(uri=post['uri'], cid=post['cid'])
                ),
                root=models.create_strong_ref(
                    models.ComAtprotoRepoStrongRef.Main(uri=post['uri'], cid=post['cid'])
                )
            )

            self.client.send_post(text=comment_text, reply_to=reply_ref)
            logger.info(f"Added comment to post: {post['uri']}")

        except Exception as e:
            logger.error(f"Error adding comment: {e}")

    def run_repost_cycle(self):
        """Run one cycle of searching and reposting"""
        logger.info("Starting repost cycle...")

        max_reposts = self.config.get('repost', {}).get('max_reposts_per_run', 5)
        repost_count = 0

        tags = self.config.get('search', {}).get('tags', [])
        keywords = self.config.get('search', {}).get('keywords', [])
        search_queries = tags + keywords

        for query in search_queries:
            if repost_count >= max_reposts:
                break

            logger.info(f"Searching for: {query}")
            posts = self.search_posts(query)

            for post in posts:
                if repost_count >= max_reposts:
                    break

                if self.should_repost(post):
                    logger.info(f"Found post to repost from @{post['author']}: {post['text'][:50]}...")
                    if self.repost_with_comment(post):
                        repost_count += 1

        logger.info(f"Repost cycle completed. Reposted {repost_count} posts.")

print("RepostManager class defined")

RepostManager class defined


## 6. ReplyManager Class Definition

In [7]:
class ReplyManager:
    def __init__(self, client: Client, db: Database, config: dict):
        self.client = client
        self.db = db
        self.config = config

    def get_notifications(self) -> List[dict]:
        """Get recent notifications (mentions and replies)"""
        try:
            response = self.client.app.bsky.notification.list_notifications(params={'limit': 50})
            notifications = []
            if hasattr(response, 'notifications'):
                for notif in response.notifications:
                    if notif.reason == 'reply':
                        notifications.append({
                            'uri': notif.uri,
                            'cid': notif.cid,
                            'author': notif.author.handle,
                            'record': notif.record,
                            'reason': notif.reason
                        })
            return notifications
        except Exception as e:
            logger.error(f"Error getting notifications: {e}")
            return []

    def get_post_replies(self, post_uri: str) -> List[dict]:
        """Get replies to a specific post"""
        try:
            response = self.client.app.bsky.feed.get_post_thread(params={'uri': post_uri})
            replies = []
            if hasattr(response, 'thread') and hasattr(response.thread, 'replies'):
                for reply in response.thread.replies:
                    if hasattr(reply, 'post'):
                        post = reply.post
                        replies.append({
                            'uri': post.uri,
                            'cid': post.cid,
                            'author': post.author.handle,
                            'text': post.record.text if hasattr(post.record, 'text') else '',
                            'created_at': post.record.created_at if hasattr(post.record, 'created_at') else None
                        })
            return replies
        except Exception as e:
            logger.error(f"Error getting post replies: {e}")
            return []

    def detect_keywords(self, text: str) -> Optional[str]:
        """Detect keywords in text and return appropriate response"""
        text_lower = text.lower()
        keyword_responses = self.config.get('auto_reply', {}).get('keyword_responses', {})

        for keyword, response in keyword_responses.items():
            if keyword.lower() in text_lower:
                return response

        return None

    def send_reply(self, parent_post: dict, reply_text: str) -> bool:
        """Send a reply to a post"""
        try:
            reply_ref = models.AppBskyFeedPost.ReplyRef(
                parent=models.create_strong_ref(
                    models.ComAtprotoRepoStrongRef.Main(uri=parent_post['uri'], cid=parent_post['cid'])
                ),
                root=models.create_strong_ref(
                    models.ComAtprotoRepoStrongRef.Main(uri=parent_post['uri'], cid=parent_post['cid'])
                )
            )

            self.client.send_post(text=reply_text, reply_to=reply_ref)
            logger.info(f"Sent reply to @{parent_post['author']}: {reply_text}")
            return True

        except Exception as e:
            logger.error(f"Error sending reply: {e}")
            return False

    def process_new_replies(self):
        """Process new replies and send auto-responses"""
        logger.info("Checking for new replies...")
        recent_reposts = self.db.get_recent_reposts(limit=20)
        reply_count = 0

        for repost in recent_reposts:
            if not repost['repost_uri']:
                continue

            replies = self.get_post_replies(repost['repost_uri'])

            for reply in replies:
                if self.db.is_reply_processed(reply['uri']):
                    continue

                if reply['author'] == self.client.me.handle:
                    continue

                response_text = self.detect_keywords(reply['text'])

                if response_text:
                    logger.info(f"Found keyword in reply from @{reply['author']}: {reply['text'][:50]}...")
                    if self.send_reply(reply, response_text):
                        reply_count += 1
                        self.db.add_processed_reply(reply['uri'], repost['repost_uri'], reply['author'], reply['text'])
                else:
                    default_response = self.config.get('auto_reply', {}).get('default_response')
                    if default_response:
                        if self.send_reply(reply, default_response):
                            reply_count += 1
                            self.db.add_processed_reply(reply['uri'], repost['repost_uri'], reply['author'], reply['text'])

        logger.info(f"Processed {reply_count} new replies.")

    def monitor_notifications(self):
        """Monitor notifications for replies and respond"""
        logger.info("Monitoring notifications...")
        notifications = self.get_notifications()
        reply_count = 0

        for notif in notifications:
            if self.db.is_reply_processed(notif['uri']):
                continue

            if notif['author'] == self.client.me.handle:
                continue

            text = ''
            if hasattr(notif['record'], 'text'):
                text = notif['record'].text

            response_text = self.detect_keywords(text)

            if not response_text:
                response_text = self.config.get('auto_reply', {}).get('default_response')

            if response_text:
                logger.info(f"Responding to notification from @{notif['author']}")
                if self.send_reply(notif, response_text):
                    reply_count += 1
                    self.db.add_processed_reply(notif['uri'], notif['uri'], notif['author'], text)

        logger.info(f"Responded to {reply_count} notifications.")

print("ReplyManager class defined")

ReplyManager class defined


## 7. Configuration

Set up your bot configuration here:

In [17]:
# Bot configuration
config = {
    "search": {
        "tags": ["#AI", "#Tech"],
        "keywords": ["Gemini", "ChatGPT"],
        "check_interval_minutes": 10
    },
    "repost": {
        "preset_comment": "This is an interesting post that I want to share with everyone!",
        "max_reposts_per_run": 1
    },
    "auto_reply": {
        "keyword_responses": {
            "thank": "You're welcome! Glad I could help.",
            "how to": "If you have any questions, feel free to ask anytime!",
            "ok": "Great! Is there anything else you need help with?",
            "help": "We're here to support you anytime!",
            "price": "Please check our official website for the latest pricing information."
        },
        "default_response": "Thank you for your comment! We'll get back to you soon."
    }
}

print("Configuration loaded:")
print(json.dumps(config, indent=2))

Configuration loaded:
{
  "search": {
    "tags": [
      "#AI",
      "#Tech"
    ],
    "keywords": [
      "Gemini",
      "ChatGPT"
    ],
    "check_interval_minutes": 10
  },
  "repost": {
    "preset_comment": "This is an interesting post that I want to share with everyone!",
    "max_reposts_per_run": 1
  },
  "auto_reply": {
    "keyword_responses": {
      "thank": "You're welcome! Glad I could help.",
      "how to": "If you have any questions, feel free to ask anytime!",
      "ok": "Great! Is there anything else you need help with?",
      "help": "We're here to support you anytime!",
      "price": "Please check our official website for the latest pricing information."
    },
    "default_response": "Thank you for your comment! We'll get back to you soon."
  }
}


## 8. Login to Bluesky

Enter your Bluesky credentials:

In [9]:
# Get credentials from user input
print("Enter your Bluesky credentials:")
bluesky_handle = input("Bluesky Handle (e.g., username.bsky.social): ")
bluesky_password = getpass("Bluesky Password: ")

# Login to Bluesky
print("\nLogging into Bluesky...")
client = Client()
client.login(bluesky_handle, bluesky_password)
print(f"Successfully logged in as: {client.me.handle}")

Enter your Bluesky credentials:

Logging into Bluesky...


2025-12-02 15:08:15,986 - httpx - INFO - HTTP Request: POST https://bsky.social/xrpc/com.atproto.server.createSession "HTTP/1.1 200 OK"
2025-12-02 15:08:16,130 - httpx - INFO - HTTP Request: GET https://cortinarius.us-west.host.bsky.network/xrpc/app.bsky.actor.getProfile?actor=ziwen00.bsky.social "HTTP/1.1 200 OK"


Successfully logged in as: ziwen00.bsky.social


## 9. Initialize Bot Components

In [10]:
# Initialize database
db = Database()
print("Database initialized")

# Initialize managers
repost_manager = RepostManager(client, db, config)
reply_manager = ReplyManager(client, db, config)
print("Bot components initialized successfully")

Database initialized
Bot components initialized successfully


## 10. Run Single Bot Cycle

Execute one complete cycle (search, repost, and reply):

In [19]:
def run_bot_cycle():
    """Run one complete bot cycle"""
    print("\n" + "="*60)
    print(f"Starting bot cycle - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("="*60 + "\n")
    
    try:
        print("Step 1: Search and repost posts")
        repost_manager.run_repost_cycle()
        
        print("\nStep 2: Process post comments")
        reply_manager.process_new_replies()
        
        print("\nStep 3: Monitor notifications")
        reply_manager.monitor_notifications()
        
    except Exception as e:
        print(f"Error: {e}")
        logger.error(f"Bot cycle error: {e}")
    
    print("\n" + "="*60)
    print("Bot cycle completed")
    print("="*60 + "\n")

# Run once
run_bot_cycle()

2025-12-02 15:14:18,428 - __main__ - INFO - Starting repost cycle...
2025-12-02 15:14:18,429 - __main__ - INFO - Searching for: #AI



Starting bot cycle - 2025-12-02 15:14:18

Step 1: Search and repost posts


2025-12-02 15:14:19,263 - httpx - INFO - HTTP Request: GET https://cortinarius.us-west.host.bsky.network/xrpc/app.bsky.feed.searchPosts?q=%23AI&limit=25&sort=latest "HTTP/1.1 200 OK"
2025-12-02 15:14:19,274 - __main__ - INFO - Found post to repost from @toppnews.bsky.social: 
#AI   #Singapore...
2025-12-02 15:14:19,325 - httpx - INFO - HTTP Request: POST https://cortinarius.us-west.host.bsky.network/xrpc/com.atproto.repo.createRecord "HTTP/1.1 200 OK"
2025-12-02 15:14:19,326 - __main__ - INFO - Reposted: at://did:plc:mhqnbglrgmwiuz3kgexbi7uf/app.bsky.feed.post/3m722ueho5w24
2025-12-02 15:14:19,377 - httpx - INFO - HTTP Request: POST https://cortinarius.us-west.host.bsky.network/xrpc/com.atproto.repo.createRecord "HTTP/1.1 200 OK"
2025-12-02 15:14:19,378 - __main__ - INFO - Added comment to post: at://did:plc:mhqnbglrgmwiuz3kgexbi7uf/app.bsky.feed.post/3m722ueho5w24
2025-12-02 15:14:19,380 - __main__ - INFO - Repost cycle completed. Reposted 1 posts.
2025-12-02 15:14:19,380 - __main__ -


Step 2: Process post comments


2025-12-02 15:14:19,609 - httpx - INFO - HTTP Request: GET https://cortinarius.us-west.host.bsky.network/xrpc/app.bsky.feed.getPostThread?uri=at%3A%2F%2Fdid%3Aplc%3A35kd23kwtfrjufg64qpwep4n%2Fapp.bsky.feed.repost%2Fbafyreif4lnu4k4y5ypcn6c7xewvsj6uyuvj6mi5sswg7zpkaq2svfufjty "HTTP/1.1 502 Bad Gateway"
2025-12-02 15:14:19,610 - __main__ - ERROR - Error getting post replies: Response(success=False, status_code=502, content=XrpcError(error='InternalServerError', message='Internal Server Error'), headers={'x-powered-by': 'Express', 'access-control-allow-origin': '*', 'cache-control': 'private', 'vary': 'Authorization, Accept-Encoding', 'ratelimit-limit': '3000', 'ratelimit-reset': '1764717498', 'ratelimit-remaining': '2905', 'ratelimit-policy': '3000;w=300', 'content-type': 'application/json; charset=utf-8', 'content-length': '65', 'etag': 'W/"41-PDrLQG9BpCN547AVB5jtYl3RBt8"', 'date': 'Tue, 02 Dec 2025 23:14:19 GMT', 'keep-alive': 'timeout=90', 'strict-transport-security': 'max-age=63072000


Step 3: Monitor notifications

Bot cycle completed



## 11. View Statistics

Check recent repost activity:

In [20]:
recent_reposts = db.get_recent_reposts(limit=10)

print(f"Recent {len(recent_reposts)} Repost Records:\n")

if recent_reposts:
    for i, repost in enumerate(recent_reposts, 1):
        print(f"{i}. Author: @{repost['original_author']}")
        print(f"   Time: {repost['reposted_at']}")
        print(f"   URI: {repost['original_uri'][:60]}...")
        print()
else:
    print("No repost records yet")

Recent 7 Repost Records:

1. Author: @toppnews.bsky.social
   Time: 2025-12-02 23:14:19
   URI: at://did:plc:mhqnbglrgmwiuz3kgexbi7uf/app.bsky.feed.post/3m7...

2. Author: @kataplas.bsky.social
   Time: 2025-12-02 23:14:11
   URI: at://did:plc:2n5d56c5my6474jbjhc3wkpq/app.bsky.feed.post/3m7...

3. Author: @rawdaddies.bsky.social
   Time: 2025-12-02 23:13:45
   URI: at://did:plc:cywclt6yf2td6g7rxqf7sglv/app.bsky.feed.post/3m7...

4. Author: @toppnews.bsky.social
   Time: 2025-12-02 23:13:19
   URI: at://did:plc:mhqnbglrgmwiuz3kgexbi7uf/app.bsky.feed.post/3m7...

5. Author: @rawdaddies.bsky.social
   Time: 2025-12-02 23:10:02
   URI: at://did:plc:cywclt6yf2td6g7rxqf7sglv/app.bsky.feed.post/3m7...

6. Author: @rawdaddies.bsky.social
   Time: 2025-12-02 23:09:10
   URI: at://did:plc:cywclt6yf2td6g7rxqf7sglv/app.bsky.feed.post/3m7...

7. Author: @uscrossier.bsky.social
   Time: 2025-11-25 01:09:23
   URI: at://did:plc:xt6iy76ispmzakypxeydga7b/app.bsky.feed.post/3m6...



## 12. Continuous Run (Optional)

Run the bot continuously with scheduled intervals.

Warning: After running this cell, the bot will run indefinitely. Click the stop button to stop execution.

In [None]:
# Configure scheduled task
interval_minutes = config.get('search', {}).get('check_interval_minutes', 10)

schedule.every(interval_minutes).minutes.do(run_bot_cycle)

print(f"Bot scheduled to run every {interval_minutes} minutes")
print("Bot is running...")
print("Click the stop button to stop the bot\n")

# Run once first
run_bot_cycle()

# Continuous run
try:
    while True:
        schedule.run_pending()
        time.sleep(1)
except KeyboardInterrupt:
    print("\nBot stopped")

## 13. Test Search Functionality (Optional)

Test searching for specific tags or keywords:

In [None]:
# Test searching
test_query = "#AI"

print(f"Searching for: {test_query}\n")
posts = repost_manager.search_posts(test_query, limit=5)

print(f"Found {len(posts)} posts:\n")

for i, post in enumerate(posts, 1):
    print(f"{i}. @{post['author']}")
    print(f"   Content: {post['text'][:100]}...")
    print(f"   URI: {post['uri']}")
    print()

## 14. Update Configuration (Optional)

Modify configuration without restart:

In [None]:
# Example: Modify search tags
config['search']['tags'] = ['#AI', '#tech', '#programming']

# Example: Modify preset comment
config['repost']['preset_comment'] = 'This is an awesome post!'

# Example: Add new keyword response
config['auto_reply']['keyword_responses']['hello'] = 'Hello! Nice to meet you!'

print("Configuration updated:")
print(json.dumps(config, indent=2))

## 15. Reset Database (Optional)

Warning: This will delete all history records!

In [None]:
# Uncomment the code below to reset the database

# if os.path.exists('posts.db'):
#     os.remove('posts.db')
#     db = Database()
#     print("Database reset")
# else:
#     print("Database file does not exist")

print("Uncomment the code above to reset the database")