Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 5 additions & 74 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ edition = "2024"

[dependencies]
code0-flow = { version = "0.0.17" }
tucana = { version = "0.0.33" }
tucana = { version = "0.0.36" }
tokio = { version = "1.44.1", features = ["rt-multi-thread"] }
log = "0.4.27"
futures-lite = "2.6.0"
rand = "0.9.1"
base64 = "0.22.1"
env_logger = "0.11.8"
async-nats = "0.42.0"
async-nats = "0.44.2"
prost = "0.14.1"
tonic-health = "0.14.1"
tonic = "0.14.1"
78 changes: 34 additions & 44 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use error::RuntimeError;
use futures_lite::StreamExt;
use log::error;
use prost::Message;
use tonic_health::pb::health_server::HealthServer;
use registry::FunctionStore;
use std::collections::HashMap;
use tonic_health::pb::health_server::HealthServer;
use tucana::shared::value::Kind;
use tucana::shared::{ExecutionFlow, ListValue, NodeFunction, Value};

fn handle_node_function(
function: NodeFunction,
node_functions: &HashMap<i64, NodeFunction>,
store: &FunctionStore,
context: &mut Context,
) -> Result<Value, RuntimeError> {
Expand Down Expand Up @@ -65,46 +67,26 @@ fn handle_node_function(
}

// Its another function, that result is a direct parameter to this function
tucana::shared::node_value::Value::NodeFunctions(another_node_function) => {
tucana::shared::node_value::Value::NodeFunctionId(another_node_function) => {
// As this is another new indent, a new context will be opened
context.next_context();
let function_result: Vec<_> = another_node_function
.functions
.into_iter()
.map(|f| handle_node_function(f, &store, context))
.collect();

let mut collected = Vec::new();
for res in &function_result {
if let Ok(v) = res {
collected.push(v.clone());
}
}

let list = Value {
kind: Some(Kind::ListValue(ListValue { values: collected })),
let function_result = match node_functions.get(&another_node_function) {
Some(function_result) => {
context.next_context();
handle_node_function(function_result.clone(), node_functions, store, context )
},
None => {
todo!("Handle node not found. This should normally not happen")
}
};

let is_faulty = function_result.iter().any(|res| res.is_err());

let entry = ContextEntry::new(
Result::Ok(list.clone()),
Result::Ok(function_result.clone()?),
parameter_collection.clone(),
);

context.write_to_current_context(entry);

match !is_faulty {
true => {
// Add the value back to the main parameter
parameter_collection.push(list.clone());
}
false => {
todo!(
"Reqired function that holds the paramter failed in execution"
)
}
}
}
}
}
Expand All @@ -117,14 +99,17 @@ fn handle_node_function(
context.write_to_current_context(entry);

// Check if there is a next node, if not then this was the last one
match function.next_node {
Some(ref next_node_function) => {
let next = (**next_node_function).clone();
match function.next_node_id {
Some(next_node_function_id) => {
let node = match node_functions.get(&next_node_function_id) {
Some(node) => node,
None => todo!("Handle node not found. This should normally not happen"),
};

// Increment the context node!
context.next_node();

return handle_node_function(next, store, context);
return handle_node_function(node.clone(), node_functions, store, context);
}
None => {
if context.is_end() {
Expand All @@ -142,22 +127,28 @@ fn handle_node_function(
fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Option<Value> {
let mut context = Context::new();

if let Some(node) = flow.starting_node {
match handle_node_function(node, store, &mut context) {
let node_functions: HashMap<i64, NodeFunction> = flow
.node_functions
.into_iter()
.map(|node| return (node.database_id, node))
.collect();

if let Some(node) = node_functions.get(&flow.starting_node_id) {
return match handle_node_function(node.clone(), &node_functions, store, &mut context) {
Ok(result) => {
println!(
"Execution completed successfully: The value is {:?}",
result
);
return Some(result);
Some(result)
}
Err(runtime_error) => {
println!("Runtime Error: {:?}", runtime_error);
return None;
None
}
}
};
return None;
};
}
None
}

#[tokio::main]
Expand All @@ -180,8 +171,7 @@ async fn main() {
};

if config.with_health_service {
let health_service =
code0_flow::flow_health::HealthService::new(config.nats_url.clone());
let health_service = code0_flow::flow_health::HealthService::new(config.nats_url.clone());
let address = match format!("{}:{}", config.grpc_host, config.grpc_port).parse() {
Ok(address) => address,
Err(err) => {
Expand Down