Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve extensions API support #593

Merged
merged 2 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 16 additions & 13 deletions lambda-extension/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ where
)?;
let res = client.call(req).await?;
if res.status() != http::StatusCode::OK {
return Err(ExtensionError::boxed("unable to initialize the logs api"));
let err = format!("unable to initialize the logs api: {}", res.status());
return Err(ExtensionError::boxed(err));
}
trace!("Registered extension with Logs API");
}
Expand Down Expand Up @@ -288,7 +289,8 @@ where
)?;
let res = client.call(req).await?;
if res.status() != http::StatusCode::OK {
return Err(ExtensionError::boxed("unable to initialize the telemetry api"));
let err = format!("unable to initialize the telemetry api: {}", res.status());
return Err(ExtensionError::boxed(err));
}
trace!("Registered extension with Telemetry API");
}
Expand Down Expand Up @@ -317,30 +319,30 @@ where

let ep = match ep.ready().await {
Ok(ep) => ep,
Err(error) => {
println!("Inner service is not ready: {:?}", error);
Err(err) => {
println!("Inner service is not ready: {err:?}");
let req = if is_invoke {
requests::init_error(extension_id, &error.to_string(), None)?
requests::init_error(extension_id, &err.to_string(), None)?
} else {
requests::exit_error(extension_id, &error.to_string(), None)?
requests::exit_error(extension_id, &err.to_string(), None)?
};

client.call(req).await?;
return Err(error.into());
return Err(err.into());
}
};

let res = ep.call(event).await;
if let Err(error) = res {
println!("{:?}", error);
if let Err(err) = res {
println!("{err:?}");
let req = if is_invoke {
requests::init_error(extension_id, &error.to_string(), None)?
requests::init_error(extension_id, &err.to_string(), None)?
} else {
requests::exit_error(extension_id, &error.to_string(), None)?
requests::exit_error(extension_id, &err.to_string(), None)?
};

client.call(req).await?;
return Err(error.into());
return Err(err.into());
}
}
Ok(())
Expand Down Expand Up @@ -422,7 +424,8 @@ async fn register<'a>(
let req = requests::register_request(&name, events)?;
let res = client.call(req).await?;
if res.status() != http::StatusCode::OK {
return Err(ExtensionError::boxed("unable to register the extension"));
let err = format!("unable to register the extension: {}", res.status());
return Err(ExtensionError::boxed(err));
}

let header = res
Expand Down
2 changes: 1 addition & 1 deletion lambda-extension/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ where
let mut service = service.lock().await;
match service.call(logs).await {
Ok(_) => (),
Err(err) => println!("{:?}", err),
Err(err) => println!("{err:?}"),
}
}

Expand Down
20 changes: 12 additions & 8 deletions lambda-extension/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use serde::Serialize;
const EXTENSION_NAME_HEADER: &str = "Lambda-Extension-Name";
pub(crate) const EXTENSION_ID_HEADER: &str = "Lambda-Extension-Identifier";
const EXTENSION_ERROR_TYPE_HEADER: &str = "Lambda-Extension-Function-Error-Type";
const CONTENT_TYPE_HEADER_NAME: &str = "Content-Type";
const CONTENT_TYPE_HEADER_VALUE: &str = "application/json";

pub(crate) fn next_event_request(extension_id: &str) -> Result<Request<Body>, Error> {
let req = build_request()
Expand All @@ -24,6 +26,7 @@ pub(crate) fn register_request(extension_name: &str, events: &[&str]) -> Result<
.method(Method::POST)
.uri("/2020-01-01/extension/register")
.header(EXTENSION_NAME_HEADER, extension_name)
.header(CONTENT_TYPE_HEADER_NAME, CONTENT_TYPE_HEADER_VALUE)
.body(Body::from(serde_json::to_string(&events)?))?;

Ok(req)
Expand Down Expand Up @@ -65,14 +68,15 @@ pub(crate) fn subscribe_request(
"buffering": buffering.unwrap_or_default(),
"destination": {
"protocol": "HTTP",
"URI": format!("http://sandbox.localdomain:{}", port_number),
"URI": format!("http://sandbox.localdomain:{port_number}"),
}
});

let req = build_request()
.method(Method::PUT)
.uri(api.uri())
.header(EXTENSION_ID_HEADER, extension_id)
.header(CONTENT_TYPE_HEADER_NAME, CONTENT_TYPE_HEADER_VALUE)
.body(Body::from(serde_json::to_string(&data)?))?;

Ok(req)
Expand All @@ -91,30 +95,30 @@ pub struct ErrorRequest<'a> {
}

/// Create a new init error request to send to the Extensions API
pub fn init_error<'a>(
pub fn init_error(
extension_id: &str,
error_type: &str,
request: Option<ErrorRequest<'a>>,
request: Option<ErrorRequest<'_>>,
) -> Result<Request<Body>, Error> {
error_request("init", extension_id, error_type, request)
}

/// Create a new exit error request to send to the Extensions API
pub fn exit_error<'a>(
pub fn exit_error(
extension_id: &str,
error_type: &str,
request: Option<ErrorRequest<'a>>,
request: Option<ErrorRequest<'_>>,
) -> Result<Request<Body>, Error> {
error_request("exit", extension_id, error_type, request)
}

fn error_request<'a>(
fn error_request(
error_type: &str,
extension_id: &str,
error_str: &str,
request: Option<ErrorRequest<'a>>,
request: Option<ErrorRequest<'_>>,
) -> Result<Request<Body>, Error> {
let uri = format!("/2020-01-01/extension/{}/error", error_type);
let uri = format!("/2020-01-01/extension/{error_type}/error");

let body = match request {
None => Body::empty(),
Expand Down
2 changes: 1 addition & 1 deletion lambda-extension/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ where
let mut service = service.lock().await;
match service.call(telemetry).await {
Ok(_) => (),
Err(err) => println!("{:?}", err),
Err(err) => println!("{err:?}"),
}
}

Expand Down
5 changes: 2 additions & 3 deletions lambda-http/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ pub enum PayloadError {
impl fmt::Display for PayloadError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PayloadError::Json(json) => writeln!(f, "failed to parse payload from application/json {}", json),
PayloadError::Json(json) => writeln!(f, "failed to parse payload from application/json {json}"),
PayloadError::WwwFormUrlEncoded(form) => writeln!(
f,
"failed to parse payload from application/x-www-form-urlencoded {}",
form
"failed to parse payload from application/x-www-form-urlencoded {form}"
),
}
}
Expand Down
6 changes: 3 additions & 3 deletions lambda-http/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ fn apigw_path_with_stage(stage: &Option<String>, path: &str) -> String {
match stage {
None => path.into(),
Some(stage) if stage == "$default" => path.into(),
Some(stage) => format!("/{}{}", stage, path),
Some(stage) => format!("/{stage}{path}"),
}
}

Expand Down Expand Up @@ -418,15 +418,15 @@ fn build_request_uri(
) -> String {
let mut url = match host {
None => {
let rel_url = Url::parse(&format!("http://localhost{}", path)).unwrap();
let rel_url = Url::parse(&format!("http://localhost{path}")).unwrap();
rel_url.path().to_string()
}
Some(host) => {
let scheme = headers
.get(x_forwarded_proto())
.and_then(|s| s.to_str().ok())
.unwrap_or("https");
let url = format!("{}://{}{}", scheme, host, path);
let url = format!("{scheme}://{host}{path}");
Url::parse(&url).unwrap().to_string()
}
};
Expand Down
2 changes: 1 addition & 1 deletion lambda-runtime-api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ where
(scheme, authority, base_path)
};
let path = parts.uri.path_and_query().expect("PathAndQuery not found");
let pq: PathAndQuery = format!("{}{}", base_path, path).parse().expect("PathAndQuery invalid");
let pq: PathAndQuery = format!("{base_path}{path}").parse().expect("PathAndQuery invalid");

let uri = Uri::builder()
.scheme(scheme.as_ref())
Expand Down
4 changes: 2 additions & 2 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ where
error!("{:?}", err);
let error_type = type_name_of_val(&err);
let msg = if let Some(msg) = err.downcast_ref::<&str>() {
format!("Lambda panicked: {}", msg)
format!("Lambda panicked: {msg}")
} else {
"Lambda panicked".to_string()
};
Expand Down Expand Up @@ -268,7 +268,7 @@ where
{
error!("{:?}", err); // logs the error in CloudWatch
let error_type = type_name_of_val(&err);
let msg = format!("{}", err);
let msg = format!("{err}");

EventErrorRequest::new(request_id, error_type, &msg).into_req()
}
Expand Down