Skip to content

Commit

Permalink
Support environment variables. (#36)
Browse files Browse the repository at this point in the history
Use the package.metadata section in Cargo.toml to provide
environment variables.

Signed-off-by: David Calavera <david.calavera@gmail.com>
  • Loading branch information
calavera committed Mar 28, 2022
1 parent 0931b8a commit 92424a2
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 29 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ opentelemetry = { version = "0.17.0", features = ["trace", "rt-tokio"] }
opentelemetry-aws = "0.5.0"
reqwest = { version = "0.11.10", default-features = false, features = ["rustls-tls"] }
rustc_version = "0.4.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
strum = "0.24.0" # For String -> Enum macros for usage with the CLI.
strum_macros = "0.24.0"
thiserror = "1.0.30"
Expand Down
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,47 @@ The start subcommand emulates the AWS Lambda control plane API. Run this command
cargo lambda start
```

The function is not compiled until the first time that you try to execute. See the [invoke](#invoke) command to learn how to execute a function. Cargo will run the command `cargo run --bin FUNCTION_NAME` to try to compile the function. `FUNCTION_NAME` can be either the name of the package if the package has only one binary, or the binary name in the `[[bin]]` section if the package includes more than one binary.

### Start - Environment variables

If you need to set environment variables for your function to run, you can specify them in the metadata section of your Cargo.toml file.

Use the section `package.metadata.lambda.env` to set global variables that will applied to all functions in your package:

```toml
[package]
name = "basic-lambda"

[package.metadata.lambda.env]
RUST_LOG = "debug"
MY_CUSTOM_ENV_VARIABLE = "custom value"
```

If you have more than one function in the same package, and you want to set specific variables for each one of them, you can use a section named after each one of the binaries in your package, `package.metadata.lambda.bin.BINARY_NAME`:

```toml
[package]
name = "lambda-project"

[package.metadata.lambda.env]
RUST_LOG = "debug"

[[package.metadata.lambda.bin.get-product]]
GET_PRODUCT_ENV_VARIABLE = "custom value"

[[package.metadata.lambda.bin.add-product]]
ADD_PRODUCT_ENV_VARIABLE = "custom value"

[[bin]]
name = "get-product"
path = "src/bin/get-product.rs"

[[bin]]
name = "add-product"
path = "src/bin/add-product.rs"
```

### Invoke

The invoke subcomand helps you send requests to the control plane emulator. To use this subcommand, you have to provide the name of the Lambda function that you want to invoke, and the payload that you want to send. When the control plane emulator receives the request, it will compile your Lambda function and handle your request.
Expand Down
21 changes: 4 additions & 17 deletions src/build.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::zig;
use crate::{metadata, zig};
use cargo_zigbuild::Build as ZigBuild;
use clap::{Args, ValueHint};
use miette::{IntoDiagnostic, Result, WrapErr};
use std::{
collections::HashSet,
io::Write,
path::{Path, PathBuf},
};
Expand Down Expand Up @@ -58,23 +57,11 @@ impl Build {
.manifest_path
.as_deref()
.unwrap_or_else(|| Path::new("Cargo.toml"));
let mut metadata_cmd = cargo_metadata::MetadataCommand::new();
metadata_cmd.no_deps();
metadata_cmd.manifest_path(&manifest_path);
let metadata = metadata_cmd.exec().into_diagnostic()?;

let mut binaries: HashSet<String> = HashSet::new();
for pkg in metadata.packages {
for target in pkg.targets {
if target.kind.iter().any(|s| s == "bin") {
binaries.insert(target.name);
}
}
}
let binaries = metadata::binary_packages(manifest_path.to_path_buf())?;

if !self.build.bin.is_empty() {
for name in &self.build.bin {
if !binaries.contains(name) {
if !binaries.contains_key(name) {
return Err(miette::miette!(
"binary target is missing from this project: {}",
name
Expand Down Expand Up @@ -129,7 +116,7 @@ impl Build {
};

let base = target_dir.join(final_target).join(profile);
for name in &binaries {
for name in binaries.keys() {
let binary = base.join(name);
if binary.exists() {
let bootstrap_dir = lambda_dir.join(name);
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use clap::{Parser, Subcommand};
use miette::{miette, Result};
mod build;
mod invoke;
mod metadata;
mod progress;
mod start;
mod zig;
Expand Down
73 changes: 73 additions & 0 deletions src/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use cargo_metadata::Package;
use miette::{IntoDiagnostic, Result, WrapErr};
use serde::Deserialize;
use std::{collections::HashMap, path::PathBuf};

#[derive(Default, Deserialize)]
#[non_exhaustive]
pub(crate) struct Metadata {
#[serde(default)]
pub lambda: LambdaMetadata,
}

#[derive(Clone, Default, Deserialize)]
#[non_exhaustive]
pub(crate) struct LambdaMetadata {
#[serde(default)]
pub env: HashMap<String, String>,
#[serde(default)]
pub bin: HashMap<String, PackageMetadata>,
}

#[derive(Clone, Default, Deserialize)]
#[non_exhaustive]
pub(crate) struct PackageMetadata {
#[serde(default)]
pub env: HashMap<String, String>,
}

pub(crate) fn binary_packages(manifest_path: PathBuf) -> Result<HashMap<String, Package>> {
let mut metadata_cmd = cargo_metadata::MetadataCommand::new();
metadata_cmd.no_deps();
metadata_cmd.manifest_path(manifest_path);
let metadata = metadata_cmd.exec().into_diagnostic()?;

let mut binaries = HashMap::new();
for pkg in metadata.packages {
let mut bin_name = None;
for target in &pkg.targets {
if target.kind.iter().any(|s| s == "bin") {
bin_name = Some(target.name.clone());
break;
}
}
if let Some(name) = bin_name {
binaries.insert(name, pkg);
}
}

Ok(binaries)
}

pub(crate) fn function_metadata(
manifest_path: PathBuf,
name: &str,
) -> Result<Option<PackageMetadata>> {
let binaries = binary_packages(manifest_path)?;
let package = match binaries.get(name) {
None => return Ok(None),
Some(p) => p,
};

let metadata: Metadata = serde_json::from_value(package.metadata.clone())
.into_diagnostic()
.wrap_err("invalid lambda metadata in Cargo.toml file")?;

let mut env = HashMap::new();
env.extend(metadata.lambda.env);
if let Some(bin) = metadata.lambda.bin.get(name) {
env.extend(bin.env.clone());
}

Ok(Some(PackageMetadata { env }))
}
16 changes: 12 additions & 4 deletions src/start/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use axum::{
routing::{get, post},
Router,
};
use clap::Args;
use clap::{Args, ValueHint};
use miette::{IntoDiagnostic, Result, WrapErr};
use opentelemetry::{
global,
trace::{TraceContextExt, Tracer},
Context, KeyValue,
};
use std::{collections::HashMap, net::SocketAddr, process::Stdio};
use std::{collections::HashMap, net::SocketAddr, path::PathBuf, process::Stdio};
use tokio::{
process::Command,
sync::{mpsc::Sender, oneshot},
Expand Down Expand Up @@ -48,9 +48,15 @@ pub struct Start {
/// Address port where users send invoke requests
#[clap(short = 'p', long, default_value = "9000")]
invoke_port: u16,

/// Print OpenTelemetry traces after each function invocation
#[clap(long)]
print_traces: bool,

/// Path to Cargo.toml
#[clap(long, value_name = "PATH", parse(from_os_str), value_hint = ValueHint::FilePath)]
#[clap(default_value = "Cargo.toml")]
pub manifest_path: PathBuf,
}

impl Start {
Expand All @@ -69,10 +75,11 @@ impl Start {

let port = self.invoke_port;
let print_traces = self.print_traces;
let manifest_path = self.manifest_path.clone();

Toplevel::new()
.start("Lambda server", move |s| {
start_server(s, port, print_traces)
start_server(s, port, print_traces, manifest_path)
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
Expand All @@ -85,14 +92,15 @@ async fn start_server(
subsys: SubsystemHandle,
invoke_port: u16,
print_traces: bool,
manifest_path: PathBuf,
) -> Result<(), axum::Error> {
init_tracing(print_traces);

let addr = SocketAddr::from(([127, 0, 0, 1], invoke_port));
let server_addr = format!("http://{addr}");

let req_cache = RequestCache::new(server_addr);
let req_tx = init_scheduler(&subsys, req_cache.clone()).await;
let req_tx = init_scheduler(&subsys, req_cache.clone(), manifest_path).await;
let resp_cache = ResponseCache::new();
let x_request_id = HeaderName::from_static("lambda-runtime-aws-request-id");

Expand Down
35 changes: 27 additions & 8 deletions src/start/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::start::requests::{InvokeRequest, ServerError};
use crate::{
metadata::{self, PackageMetadata},
start::requests::{InvokeRequest, ServerError},
};
use axum::{body::Body, response::Response};
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
path::PathBuf,
sync::Arc,
};
use tokio::{
Expand Down Expand Up @@ -115,12 +119,13 @@ impl ResponseCache {
pub(crate) async fn init_scheduler(
subsys: &SubsystemHandle,
req_cache: RequestCache,
manifest_path: PathBuf,
) -> Sender<InvokeRequest> {
let (req_tx, req_rx) = mpsc::channel::<InvokeRequest>(100);

subsys.start("lambda scheduler", move |s| async {
start_scheduler(s, req_cache, req_rx).await;
Ok::<(), std::convert::Infallible>(())
start_scheduler(s, req_cache, manifest_path, req_rx).await;
Ok::<_, std::convert::Infallible>(())
});

req_tx
Expand All @@ -129,6 +134,7 @@ pub(crate) async fn init_scheduler(
async fn start_scheduler(
subsys: SubsystemHandle,
req_cache: RequestCache,
manifest_path: PathBuf,
mut req_rx: Receiver<InvokeRequest>,
) {
let (gc_tx, mut gc_rx) = mpsc::channel::<String>(10);
Expand All @@ -140,14 +146,15 @@ async fn start_scheduler(
let name = name.clone();
let api = api.clone();
let gc_tx = gc_tx.clone();
subsys.start("lambda runtime", |s| start_function(s, name, api, gc_tx));
let pb = manifest_path.clone();
subsys.start("lambda runtime", |s| start_function(s, name, api, pb, gc_tx));
}
},
Some(gc) = gc_rx.recv() => {
req_cache.clean(&gc).await;
},
_ = subsys.on_shutdown_requested() => {
info!("terminating Lambda scheduler");
info!("terminating lambda scheduler");
return;
},

Expand All @@ -159,17 +166,29 @@ async fn start_function(
subsys: SubsystemHandle,
name: String,
runtime_api: String,
manifest_path: PathBuf,
gc_tx: Sender<String>,
) -> Result<(), ServerError> {
info!(function = ?name, "starting lambda function");

let meta = match metadata::function_metadata(manifest_path, &name) {
Err(e) => {
error!(error = %e, "ignoring invalid function metadata");
PackageMetadata::default()
}
Ok(m) => m.unwrap_or_default(),
};

let mut child = Command::new("cargo")
.args(["watch", "--", "cargo", "run", "--bin", &name])
.env("RUST_LOG", std::env::var("RUST_LOG").unwrap_or_default())
.env("AWS_LAMBDA_RUNTIME_API", &runtime_api)
.env("AWS_LAMBDA_FUNCTION_NAME", &name)
.env("AWS_LAMBDA_FUNCTION_VERSION", "1")
.env("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "4096")
// Variables above the following call can be updated by variables in the metadata
.envs(meta.env)
// Variables below cannot be updated by variables in the metadata
.env("AWS_LAMBDA_RUNTIME_API", &runtime_api)
.env("AWS_LAMBDA_FUNCTION_NAME", &name)
.spawn()
.map_err(ServerError::SpawnCommand)?;

Expand All @@ -180,7 +199,7 @@ async fn start_function(
}
},
_ = subsys.on_shutdown_requested() => {
info!(function = ?name, "terminating Lambda function");
info!(function = ?name, "terminating lambda function");
let _ = child.kill().await;
}
}
Expand Down

0 comments on commit 92424a2

Please sign in to comment.