From 8d638deea358c459ea147c0dd607a276492d0174 Mon Sep 17 00:00:00 2001 From: raphael-goetz Date: Thu, 16 Oct 2025 21:28:34 +0200 Subject: [PATCH 1/2] dependencies: updated tucana and async-nats --- Cargo.lock | 79 ++++-------------------------------------------------- Cargo.toml | 4 +-- 2 files changed, 7 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5cb5892..02bb33a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,42 +67,6 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" -[[package]] -name = "async-nats" -version = "0.42.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08f6da6d49a956424ca4e28fe93656f790d748b469eaccbc7488fec545315180" -dependencies = [ - "base64", - "bytes", - "futures", - "memchr", - "nkeys", - "nuid", - "once_cell", - "pin-project", - "portable-atomic", - "rand 0.8.5", - "regex", - "ring", - "rustls-native-certs", - "rustls-pemfile", - "rustls-webpki 0.102.8", - "serde", - "serde_json", - "serde_nanos", - "serde_repr", - "thiserror", - "time", - "tokio", - "tokio-rustls", - "tokio-util", - "tokio-websockets", - "tracing", - "tryhard", - "url", -] - [[package]] name = "async-nats" version = "0.44.2" @@ -267,7 +231,7 @@ checksum = "5c7b9d030a57501b5c6bcbd667c5241a69335421008b155b55145b4e66e9a325" dependencies = [ "serde", "serde_json", - "tucana 0.0.36", + "tucana", ] [[package]] @@ -276,7 +240,7 @@ version = "0.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "568d497f7aa8cd0cb4e633e8f6e97330b3bbb8b49fcb7c2bdca703a4a9c870e8" dependencies = [ - "async-nats 0.44.2", + "async-nats", "async-trait", "code0-definition-reader", "dotenv", @@ -286,7 +250,7 @@ dependencies = [ "serde_json", "tonic", "tonic-health", - "tucana 0.0.36", + "tucana", ] [[package]] @@ -516,20 +480,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "futures" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" -dependencies = [ - "futures-channel", - "futures-core", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.31" @@ -537,7 +487,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", - "futures-sink", ] [[package]] @@ -583,12 +532,9 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ - "futures-channel", "futures-core", - "futures-io", "futures-sink", "futures-task", - "memchr", "pin-project-lite", "pin-utils", "slab", @@ -1667,7 +1613,7 @@ dependencies = [ name = "taurus" version = "0.1.0" dependencies = [ - "async-nats 0.42.0", + "async-nats", "base64", "code0-flow", "env_logger", @@ -1678,7 +1624,7 @@ dependencies = [ "tokio", "tonic", "tonic-health", - "tucana 0.0.33", + "tucana", ] [[package]] @@ -1996,21 +1942,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tucana" -version = "0.0.33" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "811247bdfb777b65329d9f8e8ff9e2d405261ef626e149997f4fca57bfe106d4" -dependencies = [ - "prost", - "prost-types", - "serde", - "serde_json", - "tonic", - "tonic-prost", - "tonic-prost-build", -] - [[package]] name = "tucana" version = "0.0.36" diff --git a/Cargo.toml b/Cargo.toml index 7554e10..742f933 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,14 +5,14 @@ edition = "2024" [dependencies] code0-flow = { version = "0.0.17" } -tucana = { version = "0.0.33" } +tucana = { version = "0.0.36" } tokio = { version = "1.44.1", features = ["rt-multi-thread"] } log = "0.4.27" futures-lite = "2.6.0" rand = "0.9.1" base64 = "0.22.1" env_logger = "0.11.8" -async-nats = "0.42.0" +async-nats = "0.44.2" prost = "0.14.1" tonic-health = "0.14.1" tonic = "0.14.1" From 5d586eae6d59b7c51d7876df4c19b44f1b046ca9 Mon Sep 17 00:00:00 2001 From: raphael-goetz Date: Thu, 16 Oct 2025 21:43:31 +0200 Subject: [PATCH 2/2] feat: updated next node handling --- src/main.rs | 78 +++++++++++++++++++++++------------------------------ 1 file changed, 34 insertions(+), 44 deletions(-) diff --git a/src/main.rs b/src/main.rs index 205b013..4b4ad7b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,13 +12,15 @@ use error::RuntimeError; use futures_lite::StreamExt; use log::error; use prost::Message; -use tonic_health::pb::health_server::HealthServer; use registry::FunctionStore; +use std::collections::HashMap; +use tonic_health::pb::health_server::HealthServer; use tucana::shared::value::Kind; use tucana::shared::{ExecutionFlow, ListValue, NodeFunction, Value}; fn handle_node_function( function: NodeFunction, + node_functions: &HashMap, store: &FunctionStore, context: &mut Context, ) -> Result { @@ -65,46 +67,26 @@ fn handle_node_function( } // Its another function, that result is a direct parameter to this function - tucana::shared::node_value::Value::NodeFunctions(another_node_function) => { + tucana::shared::node_value::Value::NodeFunctionId(another_node_function) => { // As this is another new indent, a new context will be opened - context.next_context(); - let function_result: Vec<_> = another_node_function - .functions - .into_iter() - .map(|f| handle_node_function(f, &store, context)) - .collect(); - - let mut collected = Vec::new(); - for res in &function_result { - if let Ok(v) = res { - collected.push(v.clone()); - } - } - let list = Value { - kind: Some(Kind::ListValue(ListValue { values: collected })), + let function_result = match node_functions.get(&another_node_function) { + Some(function_result) => { + context.next_context(); + handle_node_function(function_result.clone(), node_functions, store, context ) + }, + None => { + todo!("Handle node not found. This should normally not happen") + } }; - let is_faulty = function_result.iter().any(|res| res.is_err()); let entry = ContextEntry::new( - Result::Ok(list.clone()), + Result::Ok(function_result.clone()?), parameter_collection.clone(), ); context.write_to_current_context(entry); - - match !is_faulty { - true => { - // Add the value back to the main parameter - parameter_collection.push(list.clone()); - } - false => { - todo!( - "Reqired function that holds the paramter failed in execution" - ) - } - } } } } @@ -117,14 +99,17 @@ fn handle_node_function( context.write_to_current_context(entry); // Check if there is a next node, if not then this was the last one - match function.next_node { - Some(ref next_node_function) => { - let next = (**next_node_function).clone(); + match function.next_node_id { + Some(next_node_function_id) => { + let node = match node_functions.get(&next_node_function_id) { + Some(node) => node, + None => todo!("Handle node not found. This should normally not happen"), + }; // Increment the context node! context.next_node(); - return handle_node_function(next, store, context); + return handle_node_function(node.clone(), node_functions, store, context); } None => { if context.is_end() { @@ -142,22 +127,28 @@ fn handle_node_function( fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Option { let mut context = Context::new(); - if let Some(node) = flow.starting_node { - match handle_node_function(node, store, &mut context) { + let node_functions: HashMap = flow + .node_functions + .into_iter() + .map(|node| return (node.database_id, node)) + .collect(); + + if let Some(node) = node_functions.get(&flow.starting_node_id) { + return match handle_node_function(node.clone(), &node_functions, store, &mut context) { Ok(result) => { println!( "Execution completed successfully: The value is {:?}", result ); - return Some(result); + Some(result) } Err(runtime_error) => { println!("Runtime Error: {:?}", runtime_error); - return None; + None } - } - }; - return None; + }; + } + None } #[tokio::main] @@ -180,8 +171,7 @@ async fn main() { }; if config.with_health_service { - let health_service = - code0_flow::flow_health::HealthService::new(config.nats_url.clone()); + let health_service = code0_flow::flow_health::HealthService::new(config.nats_url.clone()); let address = match format!("{}:{}", config.grpc_host, config.grpc_port).parse() { Ok(address) => address, Err(err) => {