From 6dd10065b77a7f37bc106d30c28f447724cd5bba Mon Sep 17 00:00:00 2001 From: raphael-goetz Date: Thu, 27 Nov 2025 17:04:32 +0100 Subject: [PATCH 1/4] dependencies: updated async-nats, code0-flow and tucana --- Cargo.lock | 67 +++++++++++++++++++++++++++--------------- Cargo.toml | 6 ++-- crates/base/Cargo.toml | 1 + 3 files changed, 47 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e8d366..89b90e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,9 +69,9 @@ checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "async-nats" -version = "0.44.2" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f834a80c3ab6109b9c8f5ca6661a578cf31e088e831b6ce07c6b23cca04f6742" +checksum = "86dde77d8a733a9dbaf865a9eb65c72e09c88f3d14d3dd0d2aecf511920ee4fe" dependencies = [ "base64", "bytes", @@ -180,6 +180,7 @@ dependencies = [ "async-nats", "async-trait", "code0-flow", + "env_logger", "futures-lite", "log", "prost", @@ -256,34 +257,24 @@ dependencies = [ "num-traits", ] -[[package]] -name = "code0-definition-reader" -version = "0.0.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3e3a3aec6ffc35ac360113d8b96206c38961afef56b0c5632cb77cd2f4429b9" -dependencies = [ - "serde", - "serde_json", - "tucana", -] - [[package]] name = "code0-flow" -version = "0.0.18" +version = "0.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f90356cc27d5de4c9870a29163e50e6be77ebeb408220f8057d7d8ed57f2a0fb" +checksum = "bd517223799dc011207b599f7e2a818d95eb53e20dbdfc54d01f47e1c0e6baba" dependencies = [ "async-nats", "async-trait", - "code0-definition-reader", "dotenv", "futures-core", "log", "regex", + "serde", "serde_json", "tonic", "tonic-health", "tucana", + "walkdir", ] [[package]] @@ -1496,6 +1487,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.27" @@ -1536,9 +1536,9 @@ checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" [[package]] name = "serde" -version = "1.0.222" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aab69e3f5be1836a1fe0aca0b286e5a5b38f262d6c9e8acd2247818751fcc8fb" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ "serde_core", "serde_derive", @@ -1546,18 +1546,18 @@ dependencies = [ [[package]] name = "serde_core" -version = "1.0.222" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f8ebec5eea07db7df9342aa712db2138f019d9ab3454a60a680579a6f841b80" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.222" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5f61630fe26d0ff555e6c37dc445ab2f15871c8a11ace3cf471b3195d3e4f49" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -2048,9 +2048,9 @@ dependencies = [ [[package]] name = "tucana" -version = "0.0.39" +version = "0.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e6300e0f85e9352904cace7b285366a10995dbf690c6ed8fa022cd57fa8607b" +checksum = "22ab56226ccbbda9b2bd7505d4296712a2e0757254fbe601c30785ae9e2d09e6" dependencies = [ "pbjson", "pbjson-build", @@ -2136,6 +2136,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -2236,6 +2246,15 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "windows-link" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 622a623..6427567 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,8 @@ version = "0.0.0" edition = "2024" [workspace.dependencies] -code0-flow = { version = "0.0.18" } -tucana = { version = "0.0.39", features = ["aquila"] } +code0-flow = { version = "0.0.19" } +tucana = { version = "0.0.42", features = ["aquila"] } serde_json = { version = "1.0.138" } log = "0.4.27" env_logger = "0.11.8" @@ -16,7 +16,7 @@ regex = "1.11.1" tokio = { version = "1.44.1", features = ["rt-multi-thread"] } uuid = { version = "1.16.0", features = ["v4"] } tonic = "0.14.0" -async-nats = "0.44.2" +async-nats = "0.45.0" async-trait = "0.1.88" anyhow = "1.0.98" prost = "0.14.0" diff --git a/crates/base/Cargo.toml b/crates/base/Cargo.toml index 0ba2d79..8b965ce 100644 --- a/crates/base/Cargo.toml +++ b/crates/base/Cargo.toml @@ -16,3 +16,4 @@ uuid = { workspace = true } prost = { workspace = true } futures-lite = { workspace = true } log = { workspace = true } +env_logger = {workspace = true} \ No newline at end of file From 5c7609150ccb1eca816597bce822cd62ce5aad51 Mon Sep 17 00:00:00 2001 From: raphael-goetz Date: Thu, 27 Nov 2025 17:04:43 +0100 Subject: [PATCH 2/4] feat: added draco variant env variable --- crates/base/src/config.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/base/src/config.rs b/crates/base/src/config.rs index 27ed1f0..37a9bc7 100644 --- a/crates/base/src/config.rs +++ b/crates/base/src/config.rs @@ -51,6 +51,11 @@ pub struct AdapterConfig { /// /// If true the Adapter will expose a grpc health service server. pub with_health_service: bool, + + /// Variant + /// + /// The Variant of Draco. E.g. Http, Cron... + pub draco_variant: String, } impl AdapterConfig { @@ -79,6 +84,8 @@ impl AdapterConfig { let with_health_service = code0_flow::flow_config::env_with_default("WITH_HEALTH_SERVICE", false); + let draco_variant = + code0_flow::flow_config::env_with_default("DRACO_VARIANT", String::from("None")); Self { environment, nats_bucket, @@ -89,6 +96,7 @@ impl AdapterConfig { aquila_url, definition_path, with_health_service, + draco_variant, } } From 0d21d7b465a640bf2008c5f105492d6ee5fe3fd9 Mon Sep 17 00:00:00 2001 From: raphael-goetz Date: Thu, 27 Nov 2025 17:05:02 +0100 Subject: [PATCH 3/4] feat: added graceful and propper shutdown --- adapter/rest/src/main.rs | 10 +++- crates/base/src/runner.rs | 103 +++++++++++++++++++++++++++----------- crates/base/src/store.rs | 12 +++-- 3 files changed, 90 insertions(+), 35 deletions(-) diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 31eec45..44852b0 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -16,8 +16,14 @@ use tucana::shared::{Struct, ValidationFlow, Value}; #[tokio::main] async fn main() { let server = HttpServer { http_server: None }; - let runner = ServerRunner::new(server).await.unwrap(); - runner.serve().await.unwrap(); + let runner = match ServerRunner::new(server).await { + Ok(runner) => runner, + Err(err) => panic!("Failed to create server runner: {:?}", err), + }; + match runner.serve().await { + Ok(_) => (), + Err(err) => panic!("Failed to start server runner: {:?}", err), + }; } struct HttpServer { diff --git a/crates/base/src/runner.rs b/crates/base/src/runner.rs index 38020a6..8014d82 100644 --- a/crates/base/src/runner.rs +++ b/crates/base/src/runner.rs @@ -3,9 +3,9 @@ use crate::{ store::AdapterStore, traits::{LoadConfig, Server as AdapterServer}, }; -use code0_flow::flow_definition::FlowUpdateService; use std::sync::Arc; -use tokio::sync::broadcast; +use code0_flow::flow_service::FlowUpdateService; +use tokio::signal; use tonic::transport::Server; use tonic_health::pb::health_server::HealthServer; @@ -20,11 +20,14 @@ pub struct ServerContext { pub struct ServerRunner { context: ServerContext, server: Box>, - shutdown_sender: broadcast::Sender<()>, } impl ServerRunner { pub async fn new>(server: S) -> anyhow::Result { + env_logger::Builder::from_default_env() + .filter_level(log::LevelFilter::Debug) + .init(); + code0_flow::flow_config::load_env_file(); let adapter_config = AdapterConfig::from_env(); @@ -41,17 +44,15 @@ impl ServerRunner { server_config: Arc::new(server_config), }; - let (shutdown_tx, _) = broadcast::channel(1); - Ok(Self { context, server: Box::new(server), - shutdown_sender: shutdown_tx, }) } - pub async fn serve(mut self) -> anyhow::Result<()> { + pub async fn serve(self) -> anyhow::Result<()> { let config = self.context.adapter_config.clone(); + log::info!("Starting Draco Variant: {}", config.draco_variant); if !config.is_static() { let definition_service = FlowUpdateService::from_url( @@ -61,42 +62,84 @@ impl ServerRunner { definition_service.send().await; } - if config.with_health_service { + let health_task = if config.with_health_service { let health_service = code0_flow::flow_health::HealthService::new(config.nats_url.clone()); let address = format!("{}:{}", config.grpc_host, config.grpc_port).parse()?; - tokio::spawn(async move { - let _ = Server::builder() - .add_service(HealthServer::new(health_service)) - .serve(address) - .await; - }); - log::info!( - "Health server started at {}:{}", + "Health server starting at {}:{}", config.grpc_host, config.grpc_port ); - } - - self.server.init(&self.context).await?; - let mut rx = self.shutdown_sender.subscribe(); - let context = self.context; - let mut server = self.server; + Some(tokio::spawn(async move { + if let Err(err) = Server::builder() + .add_service(HealthServer::new(health_service)) + .serve(address) + .await + { + log::error!("Health server error: {:?}", err); + } else { + log::info!("Health server stopped gracefully"); + } + })) + } else { + None + }; - let handle = tokio::spawn(async move { - tokio::select! { - result = server.run(&context) => result, - _ = rx.recv() => server.shutdown(&context).await, + let ServerRunner { + mut server, + context, + } = self; + + // Init the adapter server (e.g. create underlying HTTP server) + server.init(&context).await?; + log::info!("Draco successfully initialized."); + + match health_task { + Some(mut ht) => { + tokio::select! { + // Main adapter server loop finished on its own + res = server.run(&context) => { + log::warn!("Adapter server finished, shutting down"); + ht.abort(); + res?; + } + + // Health server ended first + _ = &mut ht => { + log::warn!("Health server task finished, shutting down adapter"); + server.shutdown(&context).await?; + } + + // Ctrl+C / SIGINT + _ = signal::ctrl_c() => { + log::info!("Ctrl+C/Exit signal received, shutting down adapter"); + server.shutdown(&context).await?; + ht.abort(); + } + } } - }); - tokio::signal::ctrl_c().await?; - let _ = self.shutdown_sender.send(()); - handle.await??; + None => { + tokio::select! { + // Adapter server loop ends on its own + res = server.run(&context) => { + log::warn!("Adapter server finished"); + res?; + } + + // Ctrl+C / SIGINT + _ = signal::ctrl_c() => { + log::info!("Ctrl+C/Exit signal received, shutting down adapter"); + server.shutdown(&context).await?; + } + } + } + } + log::info!("Draco shutdown complete"); Ok(()) } } diff --git a/crates/base/src/store.rs b/crates/base/src/store.rs index 872e440..879bdf8 100644 --- a/crates/base/src/store.rs +++ b/crates/base/src/store.rs @@ -19,7 +19,10 @@ pub enum FlowIdentifyResult { impl AdapterStore { pub async fn from_url(url: String, bucket: String) -> Self { let client = match async_nats::connect(url).await { - Ok(client) => client, + Ok(client) => { + log::info!("Successfully connected to NATS"); + client + }, Err(err) => panic!("Failed to connect to NATS server: {:?}", err), }; @@ -33,13 +36,16 @@ impl AdapterStore { .await { Ok(_) => { - log::info!("Successfully created NATS bucket"); + log::info!("Successfully created NATS bucket/bucket already exists"); } Err(err) => panic!("Failed to create NATS bucket: {:?}", err), } let kv = match stream.get_key_value(bucket).await { - Ok(kv) => kv, + Ok(kv) => { + log::info!("Successfully got NATS bucket"); + kv + }, Err(err) => panic!("Failed to get key-value store: {}", err), }; From 981220b76b6b1a2ba989450e11f919204d14aeee Mon Sep 17 00:00:00 2001 From: raphael-goetz Date: Thu, 27 Nov 2025 17:08:01 +0100 Subject: [PATCH 4/4] ref: cargo clippy --- crates/base/src/runner.rs | 2 +- crates/base/src/store.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/base/src/runner.rs b/crates/base/src/runner.rs index 8014d82..d4c82f1 100644 --- a/crates/base/src/runner.rs +++ b/crates/base/src/runner.rs @@ -3,8 +3,8 @@ use crate::{ store::AdapterStore, traits::{LoadConfig, Server as AdapterServer}, }; -use std::sync::Arc; use code0_flow::flow_service::FlowUpdateService; +use std::sync::Arc; use tokio::signal; use tonic::transport::Server; use tonic_health::pb::health_server::HealthServer; diff --git a/crates/base/src/store.rs b/crates/base/src/store.rs index 879bdf8..380d85e 100644 --- a/crates/base/src/store.rs +++ b/crates/base/src/store.rs @@ -22,7 +22,7 @@ impl AdapterStore { Ok(client) => { log::info!("Successfully connected to NATS"); client - }, + } Err(err) => panic!("Failed to connect to NATS server: {:?}", err), }; @@ -45,7 +45,7 @@ impl AdapterStore { Ok(kv) => { log::info!("Successfully got NATS bucket"); kv - }, + } Err(err) => panic!("Failed to get key-value store: {}", err), };