Skip to content

Commit

Permalink
Merge pull request #86 from 56quarters/check
Browse files Browse the repository at this point in the history
Create a `check` mc command for testing connections to a server
  • Loading branch information
56quarters committed Nov 18, 2023
2 parents 9fe8969 + 5e8dd71 commit 8f78707
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## v0.7.0 - unreleased

- Build binaries for Linux Musl libc target. #83
- Move "Bytes tx" under "Gets" and "Bytes rx" under "Sets". #84
- Create a `check` subcommand for `mc` to test connections to a server. #86

## v0.6.9 - 2023-10-11

Expand Down
129 changes: 118 additions & 11 deletions mtop/src/bin/mc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use clap::{Args, Parser, Subcommand, ValueHint};
use mtop::check::{Checker, MeasurementBundle};
use mtop_client::{MemcachedPool, Meta, PoolConfig, TLSConfig, Value};
use std::error;
use std::io;
use std::path::PathBuf;
use std::{env, process};
use std::time::Duration;
use std::{env, error, io, process};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::runtime::Handle;
use tracing::Level;
Expand Down Expand Up @@ -59,6 +59,23 @@ enum Action {
Keys(KeysCommand),
Set(SetCommand),
Touch(TouchCommand),
Check(CheckCommand),
}

/// Run health checks against the cache.
#[derive(Debug, Args)]
struct CheckCommand {
/// How long to run the checks for in seconds.
#[arg(long, default_value_t = 60)]
time_secs: u64,

/// Timeout for each portion of the check (DNS, connection, set, get) in seconds.
#[arg(long, default_value_t = 5)]
timeout_secs: u64,

/// How long to wait between each health check in milliseconds.
#[arg(long, default_value_t = 100)]
delay_millis: u64,
}

/// Delete an item in the cache.
Expand Down Expand Up @@ -153,6 +170,17 @@ async fn main() -> Result<(), Box<dyn error::Error + Send + Sync>> {
});

match opts.mode {
Action::Check(c) => {
let checker = Checker::new(
&pool,
Duration::from_millis(c.delay_millis),
Duration::from_secs(c.timeout_secs),
);
let results = checker.run(&opts.host, Duration::from_secs(c.time_secs)).await;
if let Err(e) = print_check_results(&results).await {
tracing::warn!(message = "error writing output", error = %e);
}
}
Action::Delete(c) => {
if let Err(e) = client.delete(c.key.clone()).await {
tracing::error!(message = "unable to delete item", key = c.key, host = opts.host, error = %e);
Expand Down Expand Up @@ -220,15 +248,94 @@ async fn print_data(val: &Value) -> io::Result<()> {

async fn print_keys(metas: &[Meta], show_details: bool) -> io::Result<()> {
let mut output = BufWriter::new(tokio::io::stdout());
for meta in metas {
let line = if show_details {
format!("{}\t{}\t{}\n", meta.key, meta.expires, meta.size)
} else {
format!("{}\n", meta.key)
};

output.write_all(line.as_bytes()).await?;

if show_details {
for meta in metas {
output
.write_all(format!("{}\t{}\t{}\n", meta.key, meta.expires, meta.size).as_bytes())
.await?;
}
} else {
for meta in metas {
output.write_all(format!("{}\n", meta.key).as_bytes()).await?;
}
}

output.flush().await
}

async fn print_check_results(results: &MeasurementBundle) -> io::Result<()> {
let mut output = BufWriter::new(tokio::io::stdout());

output
.write_all(
format!(
"type=min total={:.9} dns={:.9} connection={:.9} set={:.9} get={:.9}\n",
results.total.min.as_secs_f64(),
results.dns.min.as_secs_f64(),
results.connections.min.as_secs_f64(),
results.sets.min.as_secs_f64(),
results.gets.min.as_secs_f64()
)
.as_bytes(),
)
.await?;

output
.write_all(
format!(
"type=max total={:.9} dns={:.9} connection={:.9} set={:.9} get={:.9}\n",
results.total.max.as_secs_f64(),
results.dns.max.as_secs_f64(),
results.connections.max.as_secs_f64(),
results.sets.max.as_secs_f64(),
results.gets.max.as_secs_f64(),
)
.as_bytes(),
)
.await?;

output
.write_all(
format!(
"type=avg total={:.9} dns={:.9} connection={:.9} set={:.9} get={:.9}\n",
results.total.avg.as_secs_f64(),
results.dns.avg.as_secs_f64(),
results.connections.avg.as_secs_f64(),
results.sets.avg.as_secs_f64(),
results.gets.avg.as_secs_f64()
)
.as_bytes(),
)
.await?;

output
.write_all(
format!(
"type=stddev total={:.9} dns={:.9} connection={:.9} set={:.9} get={:.9}\n",
results.total.std_dev.as_secs_f64(),
results.dns.std_dev.as_secs_f64(),
results.connections.std_dev.as_secs_f64(),
results.sets.std_dev.as_secs_f64(),
results.gets.std_dev.as_secs_f64()
)
.as_bytes(),
)
.await?;

output
.write_all(
format!(
"type=failures total={} dns={} connection={} set={} get={}\n",
results.failures.total,
results.failures.dns,
results.failures.connections,
results.failures.sets,
results.failures.gets,
)
.as_bytes(),
)
.await?;

output.flush().await
}
Loading

0 comments on commit 8f78707

Please sign in to comment.