Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 43 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ version = "0.0.0"
edition = "2024"

[workspace.dependencies]
code0-flow = { version = "0.0.18" }
tucana = { version = "0.0.39", features = ["aquila"] }
code0-flow = { version = "0.0.19" }
tucana = { version = "0.0.42", features = ["aquila"] }
serde_json = { version = "1.0.138" }
log = "0.4.27"
env_logger = "0.11.8"
regex = "1.11.1"
tokio = { version = "1.44.1", features = ["rt-multi-thread"] }
uuid = { version = "1.16.0", features = ["v4"] }
tonic = "0.14.0"
async-nats = "0.44.2"
async-nats = "0.45.0"
async-trait = "0.1.88"
anyhow = "1.0.98"
prost = "0.14.0"
Expand Down
10 changes: 8 additions & 2 deletions adapter/rest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ use tucana::shared::{Struct, ValidationFlow, Value};
#[tokio::main]
async fn main() {
let server = HttpServer { http_server: None };
let runner = ServerRunner::new(server).await.unwrap();
runner.serve().await.unwrap();
let runner = match ServerRunner::new(server).await {
Ok(runner) => runner,
Err(err) => panic!("Failed to create server runner: {:?}", err),
};
match runner.serve().await {
Ok(_) => (),
Err(err) => panic!("Failed to start server runner: {:?}", err),
};
}

struct HttpServer {
Expand Down
1 change: 1 addition & 0 deletions crates/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ uuid = { workspace = true }
prost = { workspace = true }
futures-lite = { workspace = true }
log = { workspace = true }
env_logger = {workspace = true}
8 changes: 8 additions & 0 deletions crates/base/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ pub struct AdapterConfig {
///
/// If true the Adapter will expose a grpc health service server.
pub with_health_service: bool,

/// Variant
///
/// The Variant of Draco. E.g. Http, Cron...
pub draco_variant: String,
}

impl AdapterConfig {
Expand Down Expand Up @@ -79,6 +84,8 @@ impl AdapterConfig {
let with_health_service =
code0_flow::flow_config::env_with_default("WITH_HEALTH_SERVICE", false);

let draco_variant =
code0_flow::flow_config::env_with_default("DRACO_VARIANT", String::from("None"));
Self {
environment,
nats_bucket,
Expand All @@ -89,6 +96,7 @@ impl AdapterConfig {
aquila_url,
definition_path,
with_health_service,
draco_variant,
}
}

Expand Down
103 changes: 73 additions & 30 deletions crates/base/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use crate::{
store::AdapterStore,
traits::{LoadConfig, Server as AdapterServer},
};
use code0_flow::flow_definition::FlowUpdateService;
use code0_flow::flow_service::FlowUpdateService;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::signal;
use tonic::transport::Server;
use tonic_health::pb::health_server::HealthServer;

Expand All @@ -20,11 +20,14 @@ pub struct ServerContext<C: LoadConfig> {
pub struct ServerRunner<C: LoadConfig> {
context: ServerContext<C>,
server: Box<dyn AdapterServer<C>>,
shutdown_sender: broadcast::Sender<()>,
}

impl<C: LoadConfig> ServerRunner<C> {
pub async fn new<S: AdapterServer<C>>(server: S) -> anyhow::Result<Self> {
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Debug)
.init();

code0_flow::flow_config::load_env_file();

let adapter_config = AdapterConfig::from_env();
Expand All @@ -41,17 +44,15 @@ impl<C: LoadConfig> ServerRunner<C> {
server_config: Arc::new(server_config),
};

let (shutdown_tx, _) = broadcast::channel(1);

Ok(Self {
context,
server: Box::new(server),
shutdown_sender: shutdown_tx,
})
}

pub async fn serve(mut self) -> anyhow::Result<()> {
pub async fn serve(self) -> anyhow::Result<()> {
let config = self.context.adapter_config.clone();
log::info!("Starting Draco Variant: {}", config.draco_variant);

if !config.is_static() {
let definition_service = FlowUpdateService::from_url(
Expand All @@ -61,42 +62,84 @@ impl<C: LoadConfig> ServerRunner<C> {
definition_service.send().await;
}

if config.with_health_service {
let health_task = if config.with_health_service {
let health_service =
code0_flow::flow_health::HealthService::new(config.nats_url.clone());
let address = format!("{}:{}", config.grpc_host, config.grpc_port).parse()?;

tokio::spawn(async move {
let _ = Server::builder()
.add_service(HealthServer::new(health_service))
.serve(address)
.await;
});

log::info!(
"Health server started at {}:{}",
"Health server starting at {}:{}",
config.grpc_host,
config.grpc_port
);
}

self.server.init(&self.context).await?;

let mut rx = self.shutdown_sender.subscribe();
let context = self.context;
let mut server = self.server;
Some(tokio::spawn(async move {
if let Err(err) = Server::builder()
.add_service(HealthServer::new(health_service))
.serve(address)
.await
{
log::error!("Health server error: {:?}", err);
} else {
log::info!("Health server stopped gracefully");
}
}))
} else {
None
};

let handle = tokio::spawn(async move {
tokio::select! {
result = server.run(&context) => result,
_ = rx.recv() => server.shutdown(&context).await,
let ServerRunner {
mut server,
context,
} = self;

// Init the adapter server (e.g. create underlying HTTP server)
server.init(&context).await?;
log::info!("Draco successfully initialized.");

match health_task {
Some(mut ht) => {
tokio::select! {
// Main adapter server loop finished on its own
res = server.run(&context) => {
log::warn!("Adapter server finished, shutting down");
ht.abort();
res?;
}

// Health server ended first
_ = &mut ht => {
log::warn!("Health server task finished, shutting down adapter");
server.shutdown(&context).await?;
}

// Ctrl+C / SIGINT
_ = signal::ctrl_c() => {
log::info!("Ctrl+C/Exit signal received, shutting down adapter");
server.shutdown(&context).await?;
ht.abort();
}
}
}
});

tokio::signal::ctrl_c().await?;
let _ = self.shutdown_sender.send(());
handle.await??;
None => {
tokio::select! {
// Adapter server loop ends on its own
res = server.run(&context) => {
log::warn!("Adapter server finished");
res?;
}

// Ctrl+C / SIGINT
_ = signal::ctrl_c() => {
log::info!("Ctrl+C/Exit signal received, shutting down adapter");
server.shutdown(&context).await?;
}
}
}
}

log::info!("Draco shutdown complete");
Ok(())
}
}
Loading