Skip to content

v0.8.0

Choose a tag to compare

@alexjg alexjg released this 06 Mar 13:21
· 21 commits to main since this release

The main focus of this release is a new connection management API which
replaces the Repo::connect method with separate APIs for making outgoing
connections (referred to as a "dialer") and accepting incoming connections
(an "acceptor"). The payoff is that we can automatically handle reconnection.

A second, smaller feature is the addition of a RepoObserver to help with
monitoring running samod processes.

What follows is a quick guide to the new connections API; for more details on
breaking changes and other added features, see the "Added" and "Breaking
Changes" sections which follow the guide.

Updating to the New Connection API

Outgoing connections (previously Repo::connect(..., ConnDirection::Outgoing))

// Before
let conn = repo.connect(rx, tx, ConnDirection::Outgoing).unwrap();
conn.handshake_complete().await.unwrap();

// After
let handle = repo.dial(BackoffConfig::default(), Arc::new(my_dialer)).unwrap();

// or, for WebSocket URLs directly:
let handle = repo.dial_websocket(url, BackoffConfig::default()).unwrap();

Incoming connections (previously Repo::connect(..., ConnDirection::Incoming))

// Before
let conn = repo.connect(rx, tx, ConnDirection::Incoming).unwrap();

// After — set up an acceptor once, then hand transports to it as they arrive:
let acceptor = repo.make_acceptor(url).unwrap();

// From arbitrary streams/sinks:
acceptor.accept(Transport::new(rx_stream, tx_sink)).unwrap();

// From an axum WebSocket upgrade handler:
acceptor.accept_axum(socket).unwrap();

// From a raw tungstenite stream:
acceptor.accept_tungstenite(ws_stream).unwrap();

Observing connection events

// Dialer side — wait for the first successful connection:
let peer_info = handle.established().await?;

// Dialer side — stream every lifecycle event:
let mut events = handle.events();
while let Some(event) = events.next().await {
    match event {
        DialerEvent::Connected { peer_info } => { /* … */ }
        DialerEvent::Disconnected             => { /* … */ }
        DialerEvent::Reconnecting { attempt } => { /* … */ }
        DialerEvent::MaxRetriesReached        => { break; }
    }
}

// Acceptor side — react to clients connecting/disconnecting:
let mut events = acceptor.events();
while let Some(event) = events.next().await {
    match event {
        AcceptorEvent::ClientConnected    { connection_id, peer_info } => { /* … */ }
        AcceptorEvent::ClientDisconnected { connection_id }            => { /* … */ }
    }
}

Implementing a custom Dialer

use samod::{Dialer, Transport};
use url::Url;
use std::pin::Pin;

struct MyDialer { url: Url }

impl Dialer for MyDialer {
    fn url(&self) -> Url { self.url.clone() }

    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<Transport, Box<dyn std::error::Error + Send + Sync>>> + Send>> {
        Box::pin(async move {
            // establish your transport here, then wrap it:
            Ok(Transport::new(my_rx_stream, my_tx_sink))
        })
    }
}

Implementing RuntimeHandle::sleep

If you have a custom RuntimeHandle, add the new required method:

fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send {
    tokio::time::sleep(duration) // or your runtime's equivalent
}

Added

  • Repo::dial — initiates an outgoing connection using a user-supplied
    Dialer implementation. Returns a DialerHandle immediately (non-blocking).
    The DialerHandle can be used to monitor the connection.
  • Repo::dial_websocket — convenience wrapper around Repo::dial for
    WebSocket connections; takes a Url and BackoffConfig.
  • Repo::acceptor — registers a URL as a listening address and returns an
    AcceptorHandle for accepting incoming connections on that URL.
  • Transport — a type-erased (Stream, Sink) pair returned by Dialer::connect
    and accepted by AcceptorHandle::accept.
  • RuntimeHandle::sleep — required new method on the RuntimeHandle trait;
    must return a future that completes after the given Duration. This powers
    back-off delays inside the reconnection logic.
  • samod::NeverAnnounce, an AnnouncePolicy which never announces any documents
    to peers
  • Add the native-tls feature to tungstenite and tokio-tungstenite when the
    tungstenite feature is enabled. This allows using TLS with WebSocket
    dialers.
  • The samod::tokio_io module which contains TcpDialer for connecting to
    servers over TCP and AcceptorHandle::accetp_tokio_io which implements
    the receiving end
  • samod::RepoObserver which is notified of events occurring in the Repo
    which may be of interest for monitoring (e.g. for producing throughput
    statistics on sync message processing)

Fixed

  • A bug where requests which were forwarded across peers who were configured
    to not announce documents would fail to resolve on the original requestor
  • Some interoperability bugs with the JS implementation
  • A bug where if a connection failed during establishment the io loop would
    crash, causing the whole repo to stop working

Breaking Changes

  • Repo::connect / Connection removed. The old unified connect method
    and the Connection / ConnDirection types have been removed entirely.
    Use Repo::dial (outgoing) and Repo::acceptor + AcceptorHandle::accept
    (incoming) instead.
  • RuntimeHandle::sleep is now required. Any custom RuntimeHandle
    implementation must add a sleep(duration: Duration) -> impl Future method.
  • Repo::connect_tungstenite / Repo::accept_axum / Repo::accept_tungstenite
    moved to AcceptorHandle.
    Call acceptor.accept_axum(socket) /
    acceptor.accept_tungstenite(ws) instead of the repo directly.
  • Repo::when_connected removed. Replace calls like
    repo.when_connected(peer_id).await with dialer_handle.established().await
    (dialer side) or listening on acceptor.events() for
    AcceptorEvent::ClientConnected (acceptor side). Connection IDs are now
    obtained from those futures/events rather than from when_connected.
  • load_local replaces load on LocalPool repos. RepoBuilder::load_local()
    must be used instead of RepoBuilder::load() when building a repo for a
    futures::executor::LocalPool runtime.