# Simple Discord Channel Fetcher

Fetches all messages from a single Discord channel, including threads and reply hierarchies.

## Setup
1. Make sure your `.env` file has your Discord bot token
2. Your bot must be in the server and have Read Message History permissions
3. Get the channel ID you want to fetch from

In [None]:
#|default_exp core

In [None]:
CHANNEL_ID = 1369370266899185746  # Replace with your channel ID

In [None]:
#|export
import discord
import asyncio
import json
from datetime import datetime
import os, sys
from fastcore.utils import in_notebook
import typer
from typing_extensions import Annotated

In [None]:
import nest_asyncio
from dotenv import load_dotenv

In [None]:
load_dotenv()
nest_asyncio.apply()

In [None]:
#|export
def prints(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)

In [None]:
#|export
DISCORD_TOKEN = os.getenv('DISCORD_TOKEN')
if not DISCORD_TOKEN: raise ValueError("DISCORD_TOKEN env variable not found")

In [None]:
#|export
async def fetch_channel_complete_history(channel_id, limit=None, save_to_file=True, print_summary=True):
    """
    Fetch complete history from a single Discord channel including threads and replies
    
    Args:
        channel_id (int): The Discord channel ID
        limit (int, optional): Max messages to fetch. None = all messages
        save_to_file (bool): Whether to save results to JSON file
        print_summary (bool): Whether to print summary information
    
    Returns:
        dict: Complete channel data with messages, threads, and reply hierarchies
    """
    
    # Set up Discord client
    intents = discord.Intents.default()
    intents.message_content = True
    intents.guilds = True
    intents.guild_messages = True
    client = discord.Client(intents=intents)
    
    result = None
    
    def format_message(message):
        """Format a single message with all relevant data"""
        return {
            "id": str(message.id),
            "author": {
                "name": str(message.author),
                "id": str(message.author.id),
                "display_name": getattr(message.author, 'display_name', str(message.author))
            },
            "content": message.content,
            "timestamp": message.created_at.isoformat(),
            "edited_timestamp": message.edited_at.isoformat() if message.edited_at else None,
            "attachments": [
                {
                    "url": att.url,
                    "filename": att.filename,
                    "content_type": getattr(att, 'content_type', None)
                } for att in message.attachments
            ],
            "embeds": [
                {
                    "title": embed.title,
                    "description": embed.description,
                    "url": embed.url
                } for embed in message.embeds
            ],
            "reactions": [
                {
                    "emoji": str(reaction.emoji),
                    "count": reaction.count
                } for reaction in message.reactions
            ],
            "mentions": [
                {
                    "name": str(user),
                    "id": str(user.id)
                } for user in message.mentions
            ],
            "pinned": message.pinned,
            "reply_to": None  # Will be filled if this is a reply
        }
    
    @client.event
    async def on_ready():
        nonlocal result
        prints(f'Connected as {client.user}')
        
        try:
            # Get the channel
            channel = client.get_channel(channel_id)
            if not channel:
                prints(f"Channel {channel_id} not found or no access")
                await client.close()
                return
            
            if print_summary: prints(f"Fetching from #{channel.name} in {channel.guild.name}")
            
            # Initialize result structure
            result = {
                "channel_info": {
                    "id": str(channel.id),
                    "name": channel.name,
                    "topic": channel.topic,
                    "guild_name": channel.guild.name,
                    "guild_id": str(channel.guild.id)
                },
                "messages": [],
                "threads": {}
            }
            
            # Fetch all main channel messages
            if print_summary: prints("Fetching main channel messages...")
            message_count = 0
            
            async for message in channel.history(limit=limit, oldest_first=True):
                msg_data = format_message(message)
                
                # Handle replies
                if message.reference and message.reference.message_id:
                    msg_data["reply_to"] = {
                        "message_id": str(message.reference.message_id),
                        "channel_id": str(message.reference.channel_id)
                    }
                    
                    # Get referenced message content if available
                    if message.reference.resolved:
                        ref_msg = message.reference.resolved
                        msg_data["reply_to"]["referenced_content"] = ref_msg.content
                        msg_data["reply_to"]["referenced_author"] = str(ref_msg.author)
                
                result["messages"].append(msg_data)
                message_count += 1
                
                # Check for thread on this message
                if hasattr(message, 'thread') and message.thread:
                    await fetch_thread_messages(message.thread, msg_data)
            
            if print_summary: prints(f"Fetched {message_count} main messages")
            
            # Fetch archived threads
            try:
                async for thread in channel.archived_threads(limit=100):
                    if str(thread.id) not in result["threads"]:
                        await fetch_thread_messages(thread)
            except Exception as e:
                prints(f"Could not fetch archived threads: {e}")
            
            if print_summary: prints(f"Total threads found: {len(result['threads'])}")
            
            # Print summary if requested
            if print_summary and result:
                prints(f"\n=== Channel: #{result['channel_info']['name']} ===")
                prints(f"Guild: {result['channel_info']['guild_name']}")
                prints(f"Total messages: {len(result['messages'])}")
                prints(f"Total threads: {len(result['threads'])}")
                
                # Show thread summary
                for thread_id, thread_info in result['threads'].items():
                    prints(f"  Thread '{thread_info['name']}': {len(thread_info['messages'])} messages")
            
            # Save to file if requested
            if save_to_file and result:
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                filename = f"discord_channel_{result['channel_info']['name']}_{timestamp}.json"
                
                with open(filename, 'w', encoding='utf-8') as f:
                    json.dump(result, f, indent=2, ensure_ascii=False)
                
                if print_summary:
                    prints(f"\nComplete channel data saved to: {filename}")
                    prints(f"File size: {os.path.getsize(filename) / 1024:.1f} KB")
            
        except Exception as e:
            prints(f"Error: {e}")
            await client.close()  # Add this line
        
        finally: 
            if not client.is_closed():await client.close()
            await asyncio.sleep(0.1)  # Give time for cleanup
        
    
    async def fetch_thread_messages(thread, parent_message=None):
        """Fetch all messages from a thread"""
        if print_summary: prints(f"  Fetching thread: {thread.name}")
        
        thread_data = {
            "id": str(thread.id),
            "name": thread.name,
            "parent_message_id": str(parent_message["id"]) if parent_message else None,
            "archived": getattr(thread, 'archived', False),
            "messages": []
        }
        
        try:
            async for thread_msg in thread.history(limit=limit, oldest_first=True):
                thread_msg_data = format_message(thread_msg)
                
                # Handle replies within threads
                if thread_msg.reference and thread_msg.reference.message_id:
                    thread_msg_data["reply_to"] = {
                        "message_id": str(thread_msg.reference.message_id),
                        "channel_id": str(thread_msg.reference.channel_id)
                    }
                    
                    if thread_msg.reference.resolved:
                        ref_msg = thread_msg.reference.resolved
                        thread_msg_data["reply_to"]["referenced_content"] = ref_msg.content
                        thread_msg_data["reply_to"]["referenced_author"] = str(ref_msg.author)
                
                thread_data["messages"].append(thread_msg_data)
            
            result["threads"][str(thread.id)] = thread_data
            if print_summary: prints(f"    Found {len(thread_data['messages'])} thread messages")
            
        except discord.Forbidden:
            if print_summary: prints(f"    No permission to read thread {thread.name}")
        except Exception as e:
            if print_summary: prints(f"    Error reading thread {thread.name}: {e}")
    
    # Start the client
    try: await client.start(DISCORD_TOKEN)
    except Exception as e: prints(f"Failed to start client: {e}")
    finally:
        if not client.is_closed(): await client.close()
    await asyncio.sleep(0.5)
    return result

You can use it like this:

In [None]:
# Fetch with default behavior (save and print)
channel_data = await fetch_channel_complete_history(CHANNEL_ID, print_summary=False)
len(channel_data['messages'])

Connected as hamml#3190


176

##### User

In [None]:
#|export
def _simplify_channel_data(channel_data, save_to_file=True, print_summary=True):
    """
    Simplify channel data to reduce tokens while preserving conversation structure
    - Remove timestamps, IDs, and metadata
    - Combine thread messages and replies into a single 'replies' array
    - Keep only essential information: author names, content, and conversation structure
    """
    
    if not channel_data: raise Exception('No channel data provided')
    
    simplified = {
        "channel": channel_data['channel_info']['name'],
        "conversations": []
    }
    
    # Create a map of message ID to message for reply resolution
    all_messages = {}
    for msg in channel_data['messages']:
        all_messages[msg['id']] = msg
    
    # Group messages into conversations
    processed_messages = set()
    
    for msg in channel_data['messages']:
        if msg['id'] in processed_messages:
            continue
            
        # Start a new conversation thread
        conversation = {
            "main_message": {
                "author": msg['author']['name'],
                "content": msg['content']
            },
            "replies": []
        }
        
        # Find all direct replies to this message
        for reply_msg in channel_data['messages']:
            if (reply_msg['reply_to'] and 
                reply_msg['reply_to']['message_id'] == msg['id']):
                
                conversation["replies"].append({
                    "author": reply_msg['author']['name'],
                    "content": reply_msg['content']
                })
                processed_messages.add(reply_msg['id'])
        
        # Add thread messages as replies too
        for thread_id, thread_data in channel_data['threads'].items():
            if thread_data['parent_message_id'] == msg['id']:
                for thread_msg in thread_data['messages']:
                    # Skip empty messages
                    if thread_msg['content'].strip():
                        conversation["replies"].append({
                            "author": thread_msg['author']['name'],
                            "content": thread_msg['content']
                        })
                break
        
        # Only add conversations that have content (not just empty messages)
        if (msg['content'].strip() or conversation['replies']):
            simplified["conversations"].append(conversation)
        
        processed_messages.add(msg['id'])
    
    # Print summary if requested
    if print_summary:
        prints(f"=== Simplified Channel: {simplified['channel']} ===")
        prints(f"Total conversations: {len(simplified['conversations'])}")
        
        # Show first few conversations with replies
        for i, conv in enumerate(simplified['conversations'][:3]):
            prints(f"\n--- Conversation {i+1} ---")
            prints(f"Main: {conv['main_message']['author']}")
            prints(f"  {conv['main_message']['content'][:100]}...")
            
            if conv['replies']:
                prints(f"Replies ({len(conv['replies'])}):")
                for reply in conv['replies'][:3]:
                    prints(f"  → {reply['author']}: {reply['content'][:60]}...")
    
    # Save to file if requested
    if save_to_file:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        simple_filename = f"discord_simplified_{simplified['channel']}_{timestamp}.json"
        
        with open(simple_filename, 'w', encoding='utf-8') as f:
            json.dump(simplified, f, indent=2, ensure_ascii=False)
        
        if print_summary:
            simple_size = os.path.getsize(simple_filename) / 1024
            prints(f"\nSimplified data saved to: {simple_filename}")
            prints(f"File size: {simple_size:.1f} KB")
    
    return simplified

In [None]:
#|export
async def fetch_discord_msgs(
    channel_id: int,
    limit: int = None,
    save_original: bool = True,  # Changed default to False
    save_simplified: bool = True,  # Changed default to False
    print_summary: bool = True,
    emit_to_stdout: bool = False,  # New parameter
):
    """
    Fetch all messages from a Discord channel including threads and reply hierarchies.
    
    This tool fetches complete Discord channel history and can save both detailed
    and simplified versions of the data. The simplified version is ideal for LLMs.
    
    Requirements:
    - Discord bot token in DISCORD_TOKEN env variable
    - Bot must have Read Message History permissions
    - Bot must be a member of the server
    
    Args:
        channel_id: Discord channel ID to fetch messages from
        limit: Maximum number of messages to fetch (None = all messages)
        save_original: Save complete channel data with all metadata
        save_simplified: Save simplified conversation-focused data
        print_summary: Print summary of fetched data
        emit_to_stdout: Output simplified JSON data to stdout
    
    Returns:
        tuple: (original_data, simplified_data)
    """
    print_flag = print_summary and not emit_to_stdout
    
    try:
        # Fetch complete channel data
        if print_flag: prints("Fetching complete channel data...")
        original_data = await fetch_channel_complete_history(
            channel_id, 
            limit=limit, 
            save_to_file=save_original, 
            print_summary=print_flag
        )
        
        if not original_data:
            if print_flag: prints("❌ Failed to fetch channel data")
            return None, None
        
        # Simplify the data
        simplified_data = _simplify_channel_data(
            original_data, 
            save_to_file=save_simplified, 
            print_summary=print_flag
        )
        
        # Emit to stdout if requested
        if emit_to_stdout and simplified_data:
            print(json.dumps(simplified_data, indent=2, ensure_ascii=False))
        
        if print_flag:
            prints(f"\n✅ Successfully fetched channel data!")
            if save_original:
                prints(f"📄 Original data: {len(original_data['messages'])} messages, {len(original_data['threads'])} threads")
            if save_simplified:
                prints(f"💬 Simplified data: {len(simplified_data['conversations'])} conversations")
            
        return original_data, simplified_data
        
    except Exception as e:
        prints(f"❌ Error: {e}")
        return None, None

In [None]:
#|export
def fetch_discord_msgs_cli(
    channel_id: Annotated[int, typer.Argument(help="Discord channel ID to fetch messages from")],
    limit: Annotated[int, typer.Option(help="Maximum number of messages to fetch")] = None,
    verbose: Annotated[bool, typer.Option("--verbose", help="Show detailed logs")] = False,
    save_to_files: Annotated[bool, typer.Option("--save-to-files", help="Save both original and simplified data to files and print summary")] = False,
):
    """
    Fetch all messages from a Discord channel including threads and reply hierarchies.
    
    By default, outputs simplified conversation data as JSON to stdout (suitable for piping).
    Use --save-to-files to save both original and simplified data to files with summary output.
    
    Examples:
        # Output simplified JSON to stdout
        fetch_discord_msgs 1234567890123456789
        
        # Save files and show summary
        fetch_discord_msgs 1234567890123456789 --save-to-files
        
        # Pipe simplified data to another tool
        fetch_discord_msgs 1234567890123456789 | jq '.conversations[0]'
    
    Args:
        channel_id: Discord channel ID to fetch messages from
        limit: Maximum number of messages to fetch (None = all messages)
        save_to_files: Save both original and simplified data to files and print summary
    """
    import asyncio
    return asyncio.run(fetch_discord_msgs(
        channel_id=channel_id,
        limit=limit,
        save_original=save_to_files,
        save_simplified=save_to_files,
        print_summary=verbose,
        emit_to_stdout=not save_to_files
    ))


def main(): typer.run(fetch_discord_msgs_cli)

In [None]:
# Fetch and simplify in one call
original, simplified = await fetch_discord_msgs(CHANNEL_ID, print_summary=True)

Fetching complete channel data...
Connected as hamml#3190
Fetching from #lesson-3-implementing-effective-evaluations in AI Evals For Engineers & Technical PMs
Fetching main channel messages...
  Fetching thread: question -- when we fix the prompt, and
    Found 2 thread messages
  Fetching thread: For the second sql constraint error, why
    Found 2 thread messages
  Fetching thread: question - how do we assess that we
    Found 7 thread messages
  Fetching thread: @Hamel @sh_reya But, how do we even say
    Found 3 thread messages
  Fetching thread: @Hamel @sh_reya
    Found 3 thread messages
  Fetching thread: where do ML models like NLI, etc fall
    Found 2 thread messages
  Fetching thread: What are your thoughts on measuring
    Found 2 thread messages
  Fetching thread: Can you explain what you meant by using
    Found 3 thread messages
  Fetching thread: Would you recommend using LLM-as-judge
    Found 3 thread messages
  Fetching thread: Is it okay to have a related LLM (
    

In [None]:
# from contextpack import ctx_fastcore
# fcdos = ctx_fastcore.fc_llms_ctx.get()
# from httpx import get
# script_docs = get('https://raw.githubusercontent.com/AnswerDotAI/fastcore/0fad28b8a20e437c11d70aa697659fc675656864/fastcore/script.py').text