From 9d3ed3039e6ec4db79e74abed87444515883688c Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 9 Jul 2024 10:32:40 -0400 Subject: [PATCH 1/8] feat: Add hello route --- bottlecap/src/traces/hello_agent.rs | 58 +++++++++++++++++++++++++++++ bottlecap/src/traces/mod.rs | 1 + 2 files changed, 59 insertions(+) create mode 100644 bottlecap/src/traces/hello_agent.rs diff --git a/bottlecap/src/traces/hello_agent.rs b/bottlecap/src/traces/hello_agent.rs new file mode 100644 index 000000000..5ed5adb6c --- /dev/null +++ b/bottlecap/src/traces/hello_agent.rs @@ -0,0 +1,58 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +// TODO(Astuyve): Deprecate. +// older clients require the 127.0.0.1:8126/lambda/hello route +// to identify the presence of the extension. + +use hyper::service::{make_service_fn, service_fn}; +use hyper::{http, Body, Method, Request, Response, Server, StatusCode}; +use std::convert::Infallible; +use std::net::SocketAddr; +use tracing::error; +use serde_json::json; + +const HELLO_PATH: &str = "/lambda/hello"; +const AGENT_PORT: usize = 8126; + +pub async fn start_handler() -> Result<(), Box> { + let make_svc = make_service_fn(move |_| { + let service = service_fn(move |req| { + hello_handler( + req, + ) + }); + + async move { Ok::<_, Infallible>(service) } + }); + + let port = u16::try_from(AGENT_PORT).expect("AGENT_PORT is too large"); + let addr = SocketAddr::from(([127, 0, 0, 1], port)); + let server_builder = Server::try_bind(&addr)?; + + let server = server_builder.serve(make_svc); + + // start hyper http server + if let Err(e) = server.await { + error!("Server error: {e}"); + return Err(e.into()); + } + + Ok(()) + +} + +async fn hello_handler(req: Request) -> http::Result> { + match (req.method(), req.uri().path()) { + (&Method::GET, HELLO_PATH) => { + Response::builder() + .status(200) + .body(Body::from(json!({}).to_string())) + }, + _ => { + let mut not_found = Response::default(); + *not_found.status_mut() = StatusCode::NOT_FOUND; + Ok(not_found) + } + } +} diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 8545fbe40..8fb342e1d 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -6,3 +6,4 @@ pub mod stats_processor; pub mod trace_agent; pub mod trace_flusher; pub mod trace_processor; +pub mod hello_agent; From 720bb42da13b60bf51cff2dea8afd7303ae3265a Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 9 Jul 2024 10:34:10 -0400 Subject: [PATCH 2/8] feat: call it --- bottlecap/src/bin/bottlecap/main.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 3e813a494..d6b98b310 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -41,6 +41,7 @@ use bottlecap::{ stats_processor, trace_agent, trace_flusher::{self, TraceFlusher}, trace_processor, + hello_agent, }, DOGSTATSD_PORT, EXTENSION_ACCEPT_FEATURE_HEADER, EXTENSION_FEATURES, EXTENSION_HOST, EXTENSION_ID_HEADER, EXTENSION_NAME, EXTENSION_NAME_HEADER, EXTENSION_ROUTE, @@ -313,6 +314,15 @@ async fn extension_loop_active( error!("Error starting trace agent: {e:?}"); } }); + + // TODO(astuyve): deprioritize this task after the first request + tokio::spawn(async move { + let res = hello_agent::start_handler().await; + if let Err(e) = res { + error!("Error starting hello agent: {e:?}"); + } + }); + let lambda_enhanced_metrics = enhanced_metrics::new(Arc::clone(&metrics_aggr)); let dogstatsd_cancel_token = start_dogstatsd(event_bus.get_sender_copy(), &metrics_aggr).await; From 0c05c0cd496f8f8fc3e638672fdd38d120213c5f Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 9 Jul 2024 10:45:36 -0400 Subject: [PATCH 3/8] feat: fmt --- bottlecap/src/bin/bottlecap/main.rs | 2 +- bottlecap/src/traces/hello_agent.rs | 17 +++++------------ bottlecap/src/traces/mod.rs | 2 +- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index d6b98b310..02dc9e432 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -37,11 +37,11 @@ use bottlecap::{ listener::TelemetryListener, }, traces::{ + hello_agent, stats_flusher::{self, StatsFlusher}, stats_processor, trace_agent, trace_flusher::{self, TraceFlusher}, trace_processor, - hello_agent, }, DOGSTATSD_PORT, EXTENSION_ACCEPT_FEATURE_HEADER, EXTENSION_FEATURES, EXTENSION_HOST, EXTENSION_ID_HEADER, EXTENSION_NAME, EXTENSION_NAME_HEADER, EXTENSION_ROUTE, diff --git a/bottlecap/src/traces/hello_agent.rs b/bottlecap/src/traces/hello_agent.rs index 5ed5adb6c..f4a2c6937 100644 --- a/bottlecap/src/traces/hello_agent.rs +++ b/bottlecap/src/traces/hello_agent.rs @@ -7,21 +7,17 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{http, Body, Method, Request, Response, Server, StatusCode}; +use serde_json::json; use std::convert::Infallible; use std::net::SocketAddr; use tracing::error; -use serde_json::json; const HELLO_PATH: &str = "/lambda/hello"; const AGENT_PORT: usize = 8126; pub async fn start_handler() -> Result<(), Box> { let make_svc = make_service_fn(move |_| { - let service = service_fn(move |req| { - hello_handler( - req, - ) - }); + let service = service_fn(move |req| hello_handler(req)); async move { Ok::<_, Infallible>(service) } }); @@ -39,16 +35,13 @@ pub async fn start_handler() -> Result<(), Box> { } Ok(()) - } async fn hello_handler(req: Request) -> http::Result> { match (req.method(), req.uri().path()) { - (&Method::GET, HELLO_PATH) => { - Response::builder() - .status(200) - .body(Body::from(json!({}).to_string())) - }, + (&Method::GET, HELLO_PATH) => Response::builder() + .status(200) + .body(Body::from(json!({}).to_string())), _ => { let mut not_found = Response::default(); *not_found.status_mut() = StatusCode::NOT_FOUND; diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 8fb342e1d..b70d26a83 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -1,9 +1,9 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +pub mod hello_agent; pub mod stats_flusher; pub mod stats_processor; pub mod trace_agent; pub mod trace_flusher; pub mod trace_processor; -pub mod hello_agent; From df6cbc1b8e6c36396b76ecc5a9b703960980974d Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 9 Jul 2024 11:13:40 -0400 Subject: [PATCH 4/8] fix: clippy --- bottlecap/src/traces/hello_agent.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/bottlecap/src/traces/hello_agent.rs b/bottlecap/src/traces/hello_agent.rs index f4a2c6937..dca60989c 100644 --- a/bottlecap/src/traces/hello_agent.rs +++ b/bottlecap/src/traces/hello_agent.rs @@ -17,7 +17,7 @@ const AGENT_PORT: usize = 8126; pub async fn start_handler() -> Result<(), Box> { let make_svc = make_service_fn(move |_| { - let service = service_fn(move |req| hello_handler(req)); + let service = service_fn(hello_handler); async move { Ok::<_, Infallible>(service) } }); @@ -38,14 +38,13 @@ pub async fn start_handler() -> Result<(), Box> { } async fn hello_handler(req: Request) -> http::Result> { - match (req.method(), req.uri().path()) { - (&Method::GET, HELLO_PATH) => Response::builder() + if let (&Method::GET, HELLO_PATH) = (req.method(), req.uri().path()) { + Response::builder() .status(200) - .body(Body::from(json!({}).to_string())), - _ => { - let mut not_found = Response::default(); - *not_found.status_mut() = StatusCode::NOT_FOUND; - Ok(not_found) - } + .body(Body::from(json!({}).to_string())) + } else { + let mut not_found = Response::default(); + *not_found.status_mut() = StatusCode::NOT_FOUND; + Ok(not_found) } } From 246e45b82d10fdc32cb5de8cbdec58348a359278 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 9 Jul 2024 12:11:15 -0400 Subject: [PATCH 5/8] fix: 8124 not 8126 for hello --- bottlecap/src/traces/hello_agent.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/traces/hello_agent.rs b/bottlecap/src/traces/hello_agent.rs index dca60989c..fc5b956d3 100644 --- a/bottlecap/src/traces/hello_agent.rs +++ b/bottlecap/src/traces/hello_agent.rs @@ -13,7 +13,7 @@ use std::net::SocketAddr; use tracing::error; const HELLO_PATH: &str = "/lambda/hello"; -const AGENT_PORT: usize = 8126; +const AGENT_PORT: usize = 8124; pub async fn start_handler() -> Result<(), Box> { let make_svc = make_service_fn(move |_| { From 55ae4dd1c740f863b2133278dcec3041b1abc761 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 9 Jul 2024 13:07:48 -0400 Subject: [PATCH 6/8] move telemetry listener temporarily --- bottlecap/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lib.rs b/bottlecap/src/lib.rs index 7b1c658e9..326a84a46 100644 --- a/bottlecap/src/lib.rs +++ b/bottlecap/src/lib.rs @@ -44,7 +44,10 @@ pub const LAMBDA_RUNTIME_SLUG: &str = "lambda"; pub const DOGSTATSD_PORT: u16 = 8185; pub const TELEMETRY_SUBSCRIPTION_ROUTE: &str = "2022-07-01/telemetry"; -pub const TELEMETRY_PORT: u16 = 8124; +// todo(astuyve) should be 8124 on /lambda/logs but +// telemetry is implemented on a raw socket now and +// does not multiplex routes on the same port. +pub const TELEMETRY_PORT: u16 = 8999; /// Return the base URL for the lambda runtime API /// From e0f4d71c856c38d66d0053a74646ca38e39d1dfb Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 9 Jul 2024 13:45:46 -0400 Subject: [PATCH 7/8] feat: Fix an issue where dogstatsd port is wrong. Drop enhanced invocation metric from clients because we always set it --- bottlecap/src/lib.rs | 2 +- bottlecap/src/metrics/dogstatsd.rs | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lib.rs b/bottlecap/src/lib.rs index 326a84a46..da2da3b71 100644 --- a/bottlecap/src/lib.rs +++ b/bottlecap/src/lib.rs @@ -41,7 +41,7 @@ pub const EXTENSION_ROUTE: &str = "2020-01-01/extension"; pub const LAMBDA_RUNTIME_SLUG: &str = "lambda"; // todo: make sure we can override those with environment variables -pub const DOGSTATSD_PORT: u16 = 8185; +pub const DOGSTATSD_PORT: u16 = 8125; pub const TELEMETRY_SUBSCRIPTION_ROUTE: &str = "2022-07-01/telemetry"; // todo(astuyve) should be 8124 on /lambda/logs but diff --git a/bottlecap/src/metrics/dogstatsd.rs b/bottlecap/src/metrics/dogstatsd.rs index 1d2965168..125734d90 100644 --- a/bottlecap/src/metrics/dogstatsd.rs +++ b/bottlecap/src/metrics/dogstatsd.rs @@ -65,6 +65,10 @@ impl DogStatsD { continue; } }; + if parsed_metric.name == "aws.lambda.enhanced.invocations" { + debug!("dropping invocation metric from layer, as it's set by agent"); + continue; + } let first_value = match parsed_metric.first_value() { Ok(val) => val, Err(e) => { @@ -85,6 +89,7 @@ impl DogStatsD { // Don't publish until after validation and adding metric_event to buff let _ = self.event_bus.send(Event::Metric(metric_event)).await; // todo check the result if self.cancel_token.is_cancelled() { + debug!("closing dogstatsd listener"); break; } } From b3c4d7f6ecd051fd29ef7a46b3b79d833bccfa39 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 9 Jul 2024 14:02:25 -0400 Subject: [PATCH 8/8] fix: fmt --- bottlecap/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/lib.rs b/bottlecap/src/lib.rs index da2da3b71..bedbbe246 100644 --- a/bottlecap/src/lib.rs +++ b/bottlecap/src/lib.rs @@ -44,7 +44,7 @@ pub const LAMBDA_RUNTIME_SLUG: &str = "lambda"; pub const DOGSTATSD_PORT: u16 = 8125; pub const TELEMETRY_SUBSCRIPTION_ROUTE: &str = "2022-07-01/telemetry"; -// todo(astuyve) should be 8124 on /lambda/logs but +// todo(astuyve) should be 8124 on /lambda/logs but // telemetry is implemented on a raw socket now and // does not multiplex routes on the same port. pub const TELEMETRY_PORT: u16 = 8999;