Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a default 5 second timeout to mtop network operations #90

Merged
merged 1 commit into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## v0.7.1 - unreleased

- Add default 5 second timeout to network operations done by `mtop`. #90

## v0.7.0 - 2023-11-28

- Build binaries for Linux Musl libc target. #83
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mtop-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keywords = ["top", "memcached"]
edition = "2021"

[dependencies]
pin-project-lite = "0.2.13"
rustls-pemfile = "1.0.2"
rustls-webpki = "0.101.2"
tokio = { version = "1.14.0", features = ["full"] }
Expand Down
11 changes: 11 additions & 0 deletions mtop-client/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::fmt;
use std::io;
use std::ops::Deref;
use std::str::FromStr;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter, Lines};

#[derive(Debug, Default, PartialEq, Clone)]
Expand Down Expand Up @@ -529,6 +530,16 @@ impl MtopError {
}
}

pub fn timeout<D>(t: Duration, operation: D) -> MtopError
where
D: fmt::Display,
{
MtopError {
kind: ErrorKind::IO,
repr: ErrorRepr::Message(format!("operation {} timed out after {:?}", operation, t)),
}
}

pub fn kind(&self) -> ErrorKind {
self.kind
}
Expand Down
2 changes: 2 additions & 0 deletions mtop-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
mod core;
mod pool;
mod timeout;

pub use crate::core::{
ErrorKind, Memcached, Meta, MtopError, ProtocolError, ProtocolErrorKind, Slab, SlabItem, SlabItems, Slabs, Stats,
Value,
};
pub use crate::pool::{MemcachedPool, PoolConfig, PooledMemcached, TLSConfig};
pub use crate::timeout::{Timed, Timeout};
58 changes: 58 additions & 0 deletions mtop-client/src/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::MtopError;
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

/// `Timeout` can be used to add a timeout to any future emitted by mtop.
pub trait Timeout: Sized {
fn timeout<S>(self, t: Duration, operation: S) -> Timed<Self>
where
S: Into<String>;
}

impl<F, V> Timeout for F
where
F: Future<Output = Result<V, MtopError>>,
{
fn timeout<S>(self, t: Duration, operation: S) -> Timed<F>
where
S: Into<String>,
{
Timed {
operation: operation.into(),
time: t,
inner: tokio::time::timeout(t, self),
}
}
}

pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Timed<T> {
operation: String,
time: Duration,
#[pin]
inner: tokio::time::Timeout<T>,
}
}

impl<F, V> Future for Timed<F>
where
F: Future<Output = Result<V, MtopError>>,
{
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
// Poll the inner Timeout future and unwrap one layer of Result it adds,
// converting a timeout into specific form of an MtopError
this.inner.poll(cx).map(|res| match res {
Ok(Ok(v)) => Ok(v),
Ok(Err(e)) => Err(e),
Err(_e) => Err(MtopError::timeout(*this.time, this.operation)),
})
}
}
75 changes: 55 additions & 20 deletions mtop/src/bin/mtop.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use clap::{Parser, ValueHint};
use mtop::queue::{BlockingStatsQueue, StatsQueue};
use mtop_client::{MemcachedPool, MtopError, PoolConfig, SlabItems, Slabs, Stats, TLSConfig};
use mtop_client::{MemcachedPool, MtopError, PoolConfig, SlabItems, Slabs, Stats, TLSConfig, Timeout};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::{env, error, process};
use tokio::net::lookup_host;
use tokio::net::ToSocketAddrs;
use tokio::runtime::Handle;
use tokio::task;
use tracing::instrument::WithSubscriber;
Expand All @@ -15,6 +16,7 @@ const DEFAULT_LOG_LEVEL: Level = Level::INFO;
// Update interval of more than a second to minimize the chance that stats returned by the
// memcached server have the exact same "time" value (which has one-second granularity).
const DEFAULT_STATS_INTERVAL: Duration = Duration::from_millis(1073);
const DEFAULT_TIMEOUT_SECS: u64 = 5;
const NUM_MEASUREMENTS: usize = 10;
const DNS_HOST_PREFIX: &str = "dns+";

Expand All @@ -27,6 +29,10 @@ struct MtopConfig {
#[arg(long, default_value_t = DEFAULT_LOG_LEVEL)]
log_level: Level,

/// Timeout for connecting to Memcached and fetching statistics, in seconds.
#[arg(long, default_value_t = DEFAULT_TIMEOUT_SECS)]
timeout_secs: u64,

/// File to log errors to since they cannot be logged to the console. If the path is not
/// writable, mtop will not start.
/// [default: $TEMP/mtop/mtop.log]
Expand Down Expand Up @@ -84,6 +90,7 @@ async fn main() -> Result<(), Box<dyn error::Error + Send + Sync>> {
process::exit(1);
});

let timeout = Duration::from_secs(opts.timeout_secs);
let measurements = Arc::new(StatsQueue::new(NUM_MEASUREMENTS));
let pool = MemcachedPool::new(
Handle::current(),
Expand All @@ -105,14 +112,14 @@ async fn main() -> Result<(), Box<dyn error::Error + Send + Sync>> {
});

// Do DNS lookups on any "dns+" hostnames to expand them to multiple IPs based on A records.
let hosts = expand_hosts(&opts.hosts).await.unwrap_or_else(|e| {
let hosts = expand_hosts(&opts.hosts, timeout).await.unwrap_or_else(|e| {
tracing::error!(message = "unable to resolve host names", hosts = ?opts.hosts, error = %e);
process::exit(1);
});

// Run the initial connection to each server once in the main thread to make bad hostnames
// easier to spot.
let update_task = UpdateTask::new(&hosts, pool, measurements.clone(), Handle::current());
let update_task = UpdateTask::new(&hosts, pool, measurements.clone(), timeout, Handle::current());
update_task.connect().await.unwrap_or_else(|e| {
tracing::error!(message = "unable to connect to memcached servers", hosts = ?opts.hosts, error = %e);
process::exit(1);
Expand All @@ -125,7 +132,7 @@ async fn main() -> Result<(), Box<dyn error::Error + Send + Sync>> {
let _ = interval.tick().await;
update_task
.update()
.instrument(tracing::span!(Level::DEBUG, "periodic.update"))
.instrument(tracing::span!(Level::INFO, "periodic.update"))
.await;
}
}
Expand Down Expand Up @@ -160,13 +167,17 @@ async fn main() -> Result<(), Box<dyn error::Error + Send + Sync>> {
Ok(())
}

async fn expand_hosts(hosts: &[String]) -> Result<Vec<String>, MtopError> {
async fn expand_hosts(hosts: &[String], timeout: Duration) -> Result<Vec<String>, MtopError> {
let mut out = Vec::with_capacity(hosts.len());

for host in hosts {
if host.starts_with(DNS_HOST_PREFIX) {
let name = host.trim_start_matches(DNS_HOST_PREFIX);
for addr in lookup_host(name).await? {
for addr in lookup_host(name)
.timeout(timeout, "lookup_host")
.instrument(tracing::span!(Level::INFO, "lookup_host"))
.await?
{
out.push(addr.to_string());
}
} else {
Expand All @@ -178,61 +189,85 @@ async fn expand_hosts(hosts: &[String]) -> Result<Vec<String>, MtopError> {
Ok(out)
}

async fn lookup_host<T>(host: T) -> Result<impl Iterator<Item = SocketAddr>, MtopError>
where
T: ToSocketAddrs,
{
// This function only exists to translate io::Error to MtopError so we can .timeout()
Ok(tokio::net::lookup_host(host).await?)
}

#[derive(Debug)]
pub struct UpdateTask {
struct UpdateTask {
hosts: Vec<String>,
pool: Arc<MemcachedPool>,
queue: Arc<StatsQueue>,
timeout: Duration,
handle: Handle,
}

impl UpdateTask {
pub fn new(hosts: &[String], pool: MemcachedPool, queue: Arc<StatsQueue>, handle: Handle) -> Self {
fn new(hosts: &[String], pool: MemcachedPool, queue: Arc<StatsQueue>, timeout: Duration, handle: Handle) -> Self {
UpdateTask {
hosts: Vec::from(hosts),
pool: Arc::new(pool),
queue,
timeout,
handle,
}
}

pub async fn connect(&self) -> Result<(), MtopError> {
async fn connect(&self) -> Result<(), MtopError> {
for host in self.hosts.iter() {
let mut client = self.pool.get(host).await?;
client.ping().await?;
let client = self
.pool
.get(host)
.timeout(self.timeout, "client.connect")
.instrument(tracing::span!(Level::INFO, "client.connect"))
.await?;
self.pool.put(client).await;
}

Ok(())
}

async fn update_host(host: String, pool: Arc<MemcachedPool>) -> Result<(Stats, Slabs, SlabItems), MtopError> {
async fn update_host(
host: String,
pool: Arc<MemcachedPool>,
timeout: Duration,
) -> Result<(Stats, Slabs, SlabItems), MtopError> {
let mut client = pool
.get(&host)
.instrument(tracing::span!(Level::DEBUG, "client.connect"))
.timeout(timeout, "client.connect")
.instrument(tracing::span!(Level::INFO, "client.connect"))
.await?;
let stats = client
.stats()
.instrument(tracing::span!(Level::DEBUG, "client.stats"))
.timeout(timeout, "client.stats")
.instrument(tracing::span!(Level::INFO, "client.stats"))
.await?;
let slabs = client
.slabs()
.instrument(tracing::span!(Level::DEBUG, "client.slabs"))
.timeout(timeout, "client.slabs")
.instrument(tracing::span!(Level::INFO, "client.slabs"))
.await?;
let items = client
.items()
.instrument(tracing::span!(Level::DEBUG, "client.items"))
.timeout(timeout, "client.items")
.instrument(tracing::span!(Level::INFO, "client.items"))
.await?;

pool.put(client).await;
Ok((stats, slabs, items))
}

pub async fn update(&self) {
async fn update(&self) {
let mut tasks = Vec::with_capacity(self.hosts.len());
for host in self.hosts.iter() {
tasks.push((
host,
self.handle.spawn(Self::update_host(host.clone(), self.pool.clone())),
self.handle
.spawn(Self::update_host(host.clone(), self.pool.clone(), self.timeout)),
));
}

Expand All @@ -243,7 +278,7 @@ impl UpdateTask {
Ok(Ok((stats, slabs, items))) => {
self.queue
.insert(host.clone(), stats, slabs, items)
.instrument(tracing::span!(Level::DEBUG, "queue.insert"))
.instrument(tracing::span!(Level::INFO, "queue.insert"))
.await;
}
}
Expand Down