Skip to content

Commit

Permalink
Refactor API to allow user to handle reconnection loop (#20)
Browse files Browse the repository at this point in the history
* Refactor

* Update README.md

* Improve documentation

* Bump tracing-core

* Re-order imports

* Rework connection fn signature
  • Loading branch information
hlbarber committed Apr 18, 2022
1 parent dee8eed commit b34d815
Show file tree
Hide file tree
Showing 11 changed files with 450 additions and 339 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ thiserror = "1"
tokio = { version = "1", features = ["io-util", "net", "time"] }
tokio-rustls = { version = "0.23", optional = true }
tokio-util = { version = "0.6", features = ["codec", "net"] }
tracing = "0.1"
tracing-core = "0.1"
tracing-core = "0.1.24"
tracing-futures = "0.2"
tracing-subscriber = "0.3"

Expand Down
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Add this to your `Cargo.toml`:
tracing-gelf = "0.6"
```

The current `tracing-gelf` requires Rust 1.53 or later.

### TCP Logging

```rust
Expand All @@ -27,27 +29,27 @@ async fn main() {
// Graylog address
let address = "127.0.0.1:12201";

// Start tracing
let bg_task = Logger::builder().init_tcp(address).unwrap();
// Initialize subscriber
let conn_handle = Logger::builder().init_tcp(address).unwrap();

// Spawn background task
// Any futures executor can be used
tokio::spawn(bg_task);
tokio::spawn(conn_handle.connect());

// Send a log to Graylog
tracing::info!(message = "oooh, what's in here?");

// Create a span
let span = tracing::info_span!("cave");
span.in_scope(|| {
// Log inside a span
let test = tracing::info_span!("deeper in cave", smell = "damp");
test.in_scope(|| {
// Send a log to Graylog, inside a nested span
tracing::warn!(message = "oh god, it's dark in here");
})
});

// Log a structured log
// Send a log to Graylog
tracing::error!(message = "i'm glad to be out", spook_lvl = 3, ruck_sack = ?["glasses", "inhaler", "large bat"]);
}

Expand Down
40 changes: 40 additions & 0 deletions examples/reconnect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::time::Duration;

use tokio::time::sleep;
use tracing_gelf::Logger;

#[tokio::main]
async fn main() {
// Graylog address
let address = "127.0.0.1:12201";

// Initialize subscriber, returning a connection handle
let mut conn_handle = Logger::builder().init_tcp(address).unwrap();

// Reconnection loop
let reconnect = async move {
loop {
// Attempt to connect
let errors = conn_handle.connect().await;

// Process errors
for (socket, error) in errors.0 {
// Perhaps log errors to an active layer
tracing::error!(%socket, %error);
}

// Don't attempt reconnect immediately
sleep(Duration::from_secs(5)).await;
}
};

// Spawn background task
// Any futures executor can be used
tokio::spawn(reconnect);

// Send a log to Graylog
tracing::info!("one day");

// Don't exit
loop {}
}
12 changes: 6 additions & 6 deletions examples/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,27 @@ async fn main() {
// Graylog address
let address = "127.0.0.1:12201";

// Start tracing
let bg_task = Logger::builder().init_tcp(address).unwrap();
// Initialize subscriber, returning a connection handle
let mut conn_handle = Logger::builder().init_tcp(address).unwrap();

// Spawn background task
// Spawn background task, this will connect and then forward messages to Graylog
// Any futures executor can be used
tokio::spawn(bg_task);
tokio::spawn(async move { conn_handle.connect().await });

// Send a log to Graylog
tracing::info!(message = "oooh, what's in here?");

// Create a span
let span = tracing::info_span!("cave");
span.in_scope(|| {
// Log inside a span
let test = tracing::info_span!("deeper in cave", smell = "damp");
test.in_scope(|| {
// Send a log to Graylog, inside a nested span
tracing::warn!(message = "oh god, it's dark in here");
})
});

// Log a structured log
// Send a log to Graylog
tracing::error!(message = "i'm glad to be out", spook_lvl = 3, ruck_sack = ?["glasses", "inhaler", "large bat"]);

// Don't exit
Expand Down
14 changes: 7 additions & 7 deletions examples/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,31 @@ async fn main() {
// Graylog address
let address = "127.0.0.1:12202";

// Start tracing
let bg_task = Logger::builder().init_udp(address).unwrap();
// Initialize subscriber, returning a connection handle
let mut conn_handle = Logger::builder().init_udp(address).unwrap();

// Spawn background task
// Spawn background task, this will connect and then forward messages to Graylog
// Any futures executor can be used
tokio::spawn(bg_task);
tokio::spawn(async move { conn_handle.connect().await });

// Send a log to Graylog
tracing::info!(message = "our dreams feel real while we're in them");

// Create a span
let span = tracing::info_span!("level 1");
span.in_scope(|| {
// Log inside a span
// Send a log to Graylog, inside a span
tracing::warn!(message = "we need to go deeper", music = "hans zimmer");

// Create an nested span
let inner_span = tracing::info_span!("level 5");
inner_span.in_scope(|| {
// Log inside nested span
// Send a log to Graylog, inside a nested span
tracing::error!(message = "you killed me");
});
});

// Log a structured log
// Send a log to Graylog
tracing::info!(message = "he's out", spinning_top = true);

// Don't exit
Expand Down
117 changes: 117 additions & 0 deletions src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
mod tcp;
mod udp;

use std::{io, net::SocketAddr};

use bytes::Bytes;
use futures_channel::mpsc;
use tokio::net::{lookup_host, ToSocketAddrs};
use tracing_core::subscriber::NoSubscriber;
use tracing_futures::WithSubscriber;

pub use tcp::*;
pub use udp::*;

/// A sequence of [errors](std::io::Error) which occurred during a connection attempt.
///
/// These are paired with a [`SocketAddr`] because the connection attempt might fail multiple times
/// during DNS resolution.
#[derive(Debug)]
pub struct ConnectionErrors(pub Vec<(SocketAddr, io::Error)>);

/// Provides an interface for connecting (and reconnecting) to Graylog. Without an established
/// connection logs will not be sent. Messages logged without an established connection will sit in
/// the buffer until they can be drained.
#[derive(Debug)]
#[must_use]
pub struct ConnectionHandle<A, Conn> {
pub(crate) addr: A,
pub(crate) receiver: mpsc::Receiver<Bytes>,
pub(crate) conn: Conn,
}

impl<A, Conn> ConnectionHandle<A, Conn> {
/// Returns the connection address.
pub fn address(&self) -> &A {
&self.addr
}
}

impl<A> ConnectionHandle<A, TcpConnection>
where
A: ToSocketAddrs,
{
/// Connects to Graylog via TCP using the address provided.
///
/// This will perform DNS resolution and attempt to connect to each [`SocketAddr`] provided.
pub async fn connect(&mut self) -> ConnectionErrors {
// Do a DNS lookup if `addr` is a hostname
let addrs = lookup_host(&self.addr).await.into_iter().flatten();

// Loop through the IP addresses that the hostname resolved to
let mut errors = Vec::new();
for addr in addrs {
let fut = self
.conn
.handle(addr, &mut self.receiver)
.with_subscriber(NoSubscriber::default());
if let Err(err) = fut.await {
errors.push((addr, err));
}
}
ConnectionErrors(errors)
}
}

#[cfg(feature = "rustls-tls")]
impl<A> ConnectionHandle<A, TlsConnection>
where
A: ToSocketAddrs,
{
/// Connects to Graylog via TLS using the address provided.
///
/// This will perform DNS resolution and attempt to connect to each [`SocketAddr`] provided.
pub async fn connect(&mut self) -> ConnectionErrors {
// Do a DNS lookup if `addr` is a hostname
let addrs = lookup_host(&self.addr).await.into_iter().flatten();

// Loop through the IP addresses that the hostname resolved to
let mut errors = Vec::new();
for addr in addrs {
let fut = self
.conn
.handle(addr, &mut self.receiver)
.with_subscriber(NoSubscriber::default());
if let Err(err) = fut.await {
errors.push((addr, err));
}
}
ConnectionErrors(errors)
}
}

impl<A> ConnectionHandle<A, UdpConnection>
where
A: ToSocketAddrs,
{
/// Connects to Graylog via UDP using the address provided.
///
/// This will perform DNS resolution and attempt to connect to each [`SocketAddr`] provided.
pub async fn connect(&mut self) -> ConnectionErrors {
// Do a DNS lookup if `addr` is a hostname
let addrs = lookup_host(&self.addr).await.into_iter().flatten();

// Loop through the IP addresses that the hostname resolved to
let mut errors = Vec::new();
for addr in addrs {
let fut = self
.conn
.handle(addr, &mut self.receiver)
.with_subscriber(NoSubscriber::default());
if let Err(err) = fut.await {
errors.push((addr, err));
}
}
ConnectionErrors(errors)
}
}
84 changes: 84 additions & 0 deletions src/connection/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::{future::Future, net::SocketAddr};

use bytes::Bytes;
use futures_util::{Stream, StreamExt};
use tokio::{io, net::TcpStream};
use tokio_util::codec::{BytesCodec, FramedWrite};

/// Handle TCP connection, generic over TCP/TLS via `F`.
async fn handle_tcp<F, R, S, I>(
addr: SocketAddr,
f: F,
receiver: &mut S,
) -> Result<(), std::io::Error>
where
S: Stream<Item = Bytes>,
S: Unpin,
I: io::AsyncRead + io::AsyncWrite + Send + Unpin,
F: FnOnce(TcpStream) -> R,
R: Future<Output = Result<I, std::io::Error>> + Send,
{
let tcp = TcpStream::connect(addr).await?;
let wrapped = (f)(tcp).await?;
let (_, writer) = io::split(wrapped);

// Writer
let sink = FramedWrite::new(writer, BytesCodec::new());
receiver.map(Ok).forward(sink).await?;

Ok(())
}

/// A TCP connection to Graylog.
#[derive(Debug)]
pub struct TcpConnection;

impl TcpConnection {
pub(super) async fn handle<S>(
&self,
addr: SocketAddr,
receiver: &mut S,
) -> Result<(), std::io::Error>
where
S: Stream<Item = Bytes> + Unpin,
{
let wrapper = |tcp_stream| async { Ok(tcp_stream) };
handle_tcp(addr, wrapper, receiver).await
}
}

/// A TLS connection to Graylog.
#[cfg(feature = "rustls-tls")]
pub struct TlsConnection {
pub(crate) server_name: tokio_rustls::rustls::ServerName,
pub(crate) client_config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
}

#[cfg(feature = "rustls-tls")]
impl TlsConnection {
pub(super) async fn handle<S>(
&self,
addr: SocketAddr,
receiver: &mut S,
) -> Result<(), std::io::Error>
where
S: Stream<Item = Bytes> + Unpin,
{
let wrapper = move |tcp_stream| {
let server_name = self.server_name.clone();
let config = tokio_rustls::TlsConnector::from(self.client_config.clone());

config.connect(server_name, tcp_stream)
};
handle_tcp(addr, wrapper, receiver).await
}
}

#[cfg(feature = "rustls-tls")]
impl std::fmt::Debug for TlsConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TlsConnection")
.field("server_name", &self.server_name)
.finish_non_exhaustive()
}
}
Loading

0 comments on commit b34d815

Please sign in to comment.