Skip to content

Commit

Permalink
First pass at metrics support
Browse files Browse the repository at this point in the history
We use the [`metrics` facade](https://docs.rs/metrics) to emit metrics
from both the connector and S3 client. The connector has a metrics
aggregator that currently just prints data to a `tracing` `info!`
message every 5 seconds. We do a little bit of work to automatically
instrument FUSE requests with their latency, but other than that,
everything is manual.

Closes #8.
  • Loading branch information
jamesbornholt committed Oct 31, 2022
1 parent 4c161eb commit 54b5b89
Show file tree
Hide file tree
Showing 14 changed files with 608 additions and 13 deletions.
53 changes: 53 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions s3-client/Cargo.toml
Expand Up @@ -14,6 +14,7 @@ auto_impl = "1.0.1"
futures = { version = "0.3.24", features = ["thread-pool"] }
libc = "0.2.126"
libc-stdhandle = "0.1.0"
metrics = "0.20.1"
pin-project = "1.0.12"
static_assertions = "1.1.0"
thiserror = "1.0.34"
Expand Down
8 changes: 8 additions & 0 deletions s3-client/src/s3_client.rs
Expand Up @@ -146,6 +146,7 @@ impl S3Client {
let request_id = Arc::new(Mutex::new(None));
let request_id_clone = Arc::clone(&request_id);
let start_time = Instant::now();
let mut first_body_part = true;

let mut options = MetaRequestOptions::new();
options
Expand All @@ -159,6 +160,13 @@ impl S3Client {
.on_body(move |range_start, data| {
let _guard = span_body.enter();

if first_body_part {
first_body_part = false;
let latency = start_time.elapsed().as_micros() as f64;
let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
metrics::histogram!("s3.first_byte_latency_us", latency, "op" => op);
}

trace!(
start = range_start,
length = data.len(),
Expand Down
4 changes: 3 additions & 1 deletion s3-client/src/s3_client/get_object.rs
Expand Up @@ -28,7 +28,9 @@ impl S3Client {
range: Option<Range<u64>>,
) -> Result<GetObjectRequest, S3RequestError<GetObjectError>> {
let span = request_span!(self, "get_object");
span.in_scope(|| debug!(?key, ?range, "new request"));
span.in_scope(
|| debug!(?key, ?range, size=?range.as_ref().map(|range| range.end - range.start), "new request"),
);

let mut message = Message::new_request(&mut Allocator::default()).unwrap();

Expand Down
3 changes: 3 additions & 0 deletions s3-file-connector/Cargo.toml
Expand Up @@ -16,7 +16,10 @@ bytes = "1.2.1"
clap = { version = "3.2.12", features = ["derive"] }
ctrlc = "3.2.3"
futures = "0.3.24"
hdrhistogram = { version = "7.5.2", default-features = false }
libc = "0.2.126"
metrics = "0.20.1"
once_cell = "1.16.0"
supports-color = "1.3.0"
thiserror = "1.0.34"
tracing = { version = "0.1.35", default-features = false, features = ["std", "log"] }
Expand Down
16 changes: 13 additions & 3 deletions s3-file-connector/src/fuse.rs
Expand Up @@ -23,6 +23,7 @@ impl<Client: ObjectClient + Send + Sync + 'static> S3FuseFilesystem<Client> {

#[async_trait]
impl<Client: ObjectClient + Send + Sync + 'static> Filesystem for S3FuseFilesystem<Client> {
#[instrument(level = "debug", skip_all)]
async fn init(&self, _req: &Request<'_>, config: &mut KernelConfig) -> Result<(), libc::c_int> {
self.fs.init(config).await
}
Expand Down Expand Up @@ -63,17 +64,21 @@ impl<Client: ObjectClient + Send + Sync + 'static> Filesystem for S3FuseFilesyst
lock: Option<u64>,
reply: ReplyData,
) {
let mut bytes_sent = 0;

struct Replied(());

struct ReplyRead {
struct ReplyRead<'a> {
inner: fuser::ReplyData,
bytes_sent: &'a mut usize,
}

impl ReadReplier for ReplyRead {
impl ReadReplier for ReplyRead<'_> {
type Replied = Replied;

fn data(self, data: &[u8]) -> Replied {
self.inner.data(data);
*self.bytes_sent = data.len();
Replied(())
}

Expand All @@ -83,9 +88,14 @@ impl<Client: ObjectClient + Send + Sync + 'static> Filesystem for S3FuseFilesyst
}
}

let replier = ReplyRead { inner: reply };
let replier = ReplyRead {
inner: reply,
bytes_sent: &mut bytes_sent,
};
self.fs.read(ino, fh, offset, size, flags, lock, replier).await;
// return value of read is proof a reply was sent

metrics::counter!("fuse.bytes_read", bytes_sent as u64);
}

#[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=parent))]
Expand Down
1 change: 1 addition & 0 deletions s3-file-connector/src/lib.rs
@@ -1,6 +1,7 @@
pub mod fs;
pub mod fuse;
mod inode;
pub mod metrics;
pub mod prefetch;

pub use fs::{S3Filesystem, S3FilesystemConfig};
Expand Down
15 changes: 13 additions & 2 deletions s3-file-connector/src/main.rs
Expand Up @@ -8,6 +8,10 @@ use s3_client::{HeadBucketError, S3Client, S3ClientConfig, S3RequestError};

use s3_file_connector::fs::S3FilesystemConfig;
use s3_file_connector::fuse::S3FuseFilesystem;
use s3_file_connector::metrics::{metrics_tracing_span_layer, MetricsSink};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;

fn init_tracing_subscriber() {
RustLogAdapter::try_init().expect("unable to install CRT log adapter");
Expand All @@ -18,9 +22,13 @@ fn init_tracing_subscriber() {
std::env::set_var("RUST_LOG", "info,awscrt=off,fuser=error");
}

tracing_subscriber::fmt()
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(supports_color::on(supports_color::Stream::Stdout).is_some())
.with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
.with_filter(tracing_subscriber::filter::EnvFilter::from_default_env());

tracing_subscriber::registry()
.with(fmt_layer)
.with(metrics_tracing_span_layer())
.init();
}

Expand Down Expand Up @@ -78,6 +86,9 @@ fn main() -> anyhow::Result<()> {
throughput_target_gbps,
part_size: args.part_size.map(|t| t as usize),
};

let _metrics = MetricsSink::init();

let client = create_client_for_bucket(&args.bucket_name, &args.region, client_config)
.context("Failed to create S3 client")?;

Expand Down

0 comments on commit 54b5b89

Please sign in to comment.