Skip to content

Firehose: Add command line flag for firehose cursor #15

@rudyfraser

Description

@rudyfraser

Right now the firehose always starts at the current stream but in situations where you want to catch up/backfill records you'd need to manually change the code to add a cursor parameter like so:

#[tokio::main]
async fn main() {
    match dotenvy::dotenv() {
        _ => (),
    };

    let default_subscriber_path =
        env::var("FEEDGEN_SUBSCRIPTION_ENDPOINT").unwrap_or("wss://bsky.social".into());
    let client = reqwest::Client::new();
    loop {
        match tokio_tungstenite::connect_async(
            Url::parse(
                format!(
                    "{}/xrpc/com.atproto.sync.subscribeRepos?cursor=1670190430", // Hardcoded cursor. Should be a command line flag.
                    default_subscriber_path
                )
                .as_str(),
            )
            .unwrap(),
        )
        .await
        {
            Ok((mut socket, _response)) => {
                println!("Connected to {default_subscriber_path:?}.");
                while let Some(Ok(Message::Binary(message))) = socket.next().await {
                    let client = client.clone();
                    tokio::spawn(async move {
                        process(message, &client).await;
                    });
                    thread::sleep(Duration::from_millis(8)); // Artificial delay that may be needed when backfilling records due to related issue
                }
            }
            Err(error) => {
                eprintln!("Error connecting to {default_subscriber_path:?}. Waiting to reconnect: {error:?}");
                thread::sleep(Duration::from_millis(500));
                continue;
            }
        }
    }
}

Instead of hardcoding a cursor param you should be able to use an optional command line flag. If there's no flag, it should default to not including a cursor param.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions