Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b95e934
feat: added action configuration implementation
raphael-goetz Feb 24, 2026
eb51245
feat: wip action configuration service server
raphael-goetz Feb 24, 2026
a5ebafa
dependencies: updated to tucana 0.0.53
raphael-goetz Feb 24, 2026
2ed3d94
feat: finished action configuration server implementation
raphael-goetz Feb 25, 2026
d34cff9
dependencies: added futures-core
raphael-goetz Feb 25, 2026
1f0e45d
feat: started to work on action transfer server
raphael-goetz Feb 25, 2026
0d861a7
drop: removed body validation part
raphael-goetz Feb 26, 2026
a384652
fix: made has_action function not consume itself
raphael-goetz Feb 26, 2026
8f9cab3
fix: added missing project_id to flow identifier
raphael-goetz Feb 26, 2026
0ba5d3c
feat: added wip action configuration
raphael-goetz Feb 26, 2026
52eed39
feat: added project id to execution flow
raphael-goetz Mar 31, 2026
0f519b4
feat: connected flow with action stream to transfer action configs
raphael-goetz Apr 1, 2026
f183855
feat: completed action endpoint
raphael-goetz Apr 2, 2026
2414769
drop: removed unused stream logic
raphael-goetz Apr 2, 2026
00679e1
feat: correct params for aquila server
raphael-goetz Apr 2, 2026
16773b8
ref: cargo clippy & fmt
raphael-goetz Apr 2, 2026
8e37624
feat: added example action config
raphael-goetz Apr 5, 2026
79c989a
fix: correct reqeust/reply logic
raphael-goetz Apr 5, 2026
eebf840
Update src/server/action_transfer_service_server_impl.rs
raphael-goetz Apr 5, 2026
cef07dc
Update src/server/action_transfer_service_server_impl.rs
raphael-goetz Apr 5, 2026
cc60592
Update src/flow/mod.rs
raphael-goetz Apr 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,568 changes: 2,568 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ tonic-health = "0.14.1"
tokio-stream = "0.1.17"
uuid = { version = "1.18.0", features = ["v4"] }
serde = "1.0.228"
futures-core = "0.3.32"
8 changes: 8 additions & 0 deletions action.configuration.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"actions": [
{
"token": "token",
Comment thread
raphael-goetz marked this conversation as resolved.
"service_name": "example"
}
]
}
15 changes: 9 additions & 6 deletions src/configuration/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,30 @@ use serde::{Deserialize, Serialize};
use serde_json::from_str;
use std::{fs::File, io::Read};

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct ActionServiceConfiguration {
token: String,
service_name: String,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct ActionConfiguration {
actions: Vec<ActionServiceConfiguration>,
}

impl ActionConfiguration {

pub fn has_action(self, token: String) -> bool {
match self.actions.into_iter().find(|x| x.token == token) {
pub fn has_action(&self, token: &String, action_identifier: &String) -> bool {
match self
.actions
.iter()
.find(|x| &x.token == token && &x.service_name == action_identifier)
{
Some(_) => true,
None => false,
}
Comment thread
raphael-goetz marked this conversation as resolved.
}

pub fn from_path(path: String) -> Self {
pub fn from_path(path: &String) -> Self {
let mut data = String::new();

let mut file = match File::open(path) {
Expand Down
5 changes: 4 additions & 1 deletion src/configuration/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ impl Config {
"SAGITTARIUS_URL",
String::from("http://localhost:50051"),
),
action_config_path: env_with_default("ACTION_CONFIG_PATH", String::from("./action.configuration.json"))
action_config_path: env_with_default(
"ACTION_CONFIG_PATH",
String::from("./action.configuration.json"),
),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/configuration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod action;
pub mod state;
pub mod config;
pub mod state;
7 changes: 5 additions & 2 deletions src/flow/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use tucana::shared::ValidationFlow;

/// Every flow identifier has this key
/// <type>.<project_slug>.<flow_id>
/// <type>.<project_slug>.<project_id>.<flow_id>
pub fn get_flow_identifier(flow: &ValidationFlow) -> String {
format!("{}.{}.{}", flow.r#type, flow.project_slug, flow.flow_id)
format!(
"{}.{}.{}.{}",
flow.r#type, flow.project_slug, flow.project_id, flow.flow_id
)
}
21 changes: 18 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::{
configuration::{config::Config as AquilaConfig, state::AppReadiness},
configuration::{
action::ActionConfiguration, config::Config as AquilaConfig, state::AppReadiness,
},
flow::get_flow_identifier,
sagittarius::retry::create_channel_with_retry,
};
Expand All @@ -17,7 +19,6 @@ pub mod configuration;
pub mod flow;
pub mod sagittarius;
pub mod server;
pub mod stream;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -78,7 +79,20 @@ async fn main() {
app_readiness.sagittarius_ready.clone(),
)
.await;
let server = AquilaGRPCServer::new(&config, app_readiness.clone(), sagittarius_channel.clone());

let (action_config_tx, _) =
tokio::sync::broadcast::channel::<tucana::shared::ActionConfigurations>(64);

let action_config = ActionConfiguration::from_path(&config.action_config_path);
let server = AquilaGRPCServer::new(
&config,
app_readiness.clone(),
sagittarius_channel.clone(),
action_config,
client.clone(),
kv_store.clone(),
action_config_tx.clone(),
);
let kv_for_flow = kv_store.clone();

let mut server_task = tokio::spawn(async move {
Expand Down Expand Up @@ -115,6 +129,7 @@ async fn main() {
config.runtime_token.clone(),
ch,
app_readiness.sagittarius_ready.clone(),
action_config_tx.clone(),
);

match flow_client.init_flow_stream().await {
Expand Down
53 changes: 53 additions & 0 deletions src/sagittarius/action_configuration_service_client_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use crate::authorization::authorization::get_authorization_metadata;
use tonic::transport::Channel;
use tonic::{Extensions, Request};
use tucana::shared::ActionConfigurationDefinition;

pub struct SagittariusActionConfigurationServiceClient {
client:
tucana::sagittarius::action_configuration_service_client::ActionConfigurationServiceClient<
Channel,
>,
token: String,
}

impl SagittariusActionConfigurationServiceClient {
pub fn new(channel: Channel, token: String) -> Self {
let client = tucana::sagittarius::action_configuration_service_client::ActionConfigurationServiceClient::new(channel);

Self { client, token }
}

pub async fn update_action_configuration(
&mut self,
action_identifier: String,
configs: Vec<ActionConfigurationDefinition>,
) -> bool {
let request = Request::from_parts(
get_authorization_metadata(&self.token),
Extensions::new(),
tucana::sagittarius::ActionConfigurationUpdateRequest {
action_identifier: action_identifier,
action_configurations: configs,
},
);

let response = match self.client.update(request).await {
Ok(response) => {
log::info!("Successfully transferred action configuration update.",);
response.into_inner()
}
Err(err) => {
log::error!("Failed to update action configurations: {:?}", err);
return true;
Comment thread
raphael-goetz marked this conversation as resolved.
}
};

match response.success {
true => log::info!("Sagittarius successfully updated ActionConfiguration."),
false => log::error!("Sagittarius didn't update any ActionConfiguration."),
};

response.success
}
}
12 changes: 9 additions & 3 deletions src/sagittarius/flow_service_client_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use futures::{StreamExt, TryStreamExt};
use prost::Message;
use std::{path::Path, sync::Arc};
use tokio::fs;
use tokio::sync::broadcast;
use tonic::{Extensions, Request, transport::Channel};
use tucana::{
sagittarius::{
Expand All @@ -20,6 +21,7 @@ pub struct SagittariusFlowClient {
env: String,
token: String,
sagittarius_ready: Arc<AtomicBool>,
action_config_tx: broadcast::Sender<tucana::shared::ActionConfigurations>,
}

impl SagittariusFlowClient {
Expand All @@ -29,6 +31,7 @@ impl SagittariusFlowClient {
token: String,
channel: Channel,
sagittarius_ready: Arc<AtomicBool>,
action_config_tx: broadcast::Sender<tucana::shared::ActionConfigurations>,
) -> SagittariusFlowClient {
let client = FlowServiceClient::new(channel);

Expand All @@ -38,6 +41,7 @@ impl SagittariusFlowClient {
env,
token,
sagittarius_ready,
action_config_tx,
}
}

Expand Down Expand Up @@ -147,9 +151,11 @@ impl SagittariusFlowClient {
};
}
}
Data::ActionConfigurations(_action_configurations) => unimplemented!(
"This will be implemented in the alpha edition. Do not use while service is in MVP!"
),
Data::ActionConfigurations(action_configurations) => {
if let Err(err) = self.action_config_tx.send(action_configurations) {
log::warn!("No action configuration receivers available: {:?}", err);
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/sagittarius/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod action_configuration_service_client_impl;
pub mod data_type_service_client_impl;
pub mod flow_service_client_impl;
pub mod flow_type_service_client_impl;
Expand Down
10 changes: 4 additions & 6 deletions src/sagittarius/test_execution_client_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@
use futures::StreamExt;
use prost::Message;
use std::sync::Arc;
use std::time::SystemTime;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Request;
use tonic::transport::Channel;
use tucana::sagittarius::execution_logon_request::Data;
use tucana::sagittarius::execution_service_client::ExecutionServiceClient;
use tucana::sagittarius::{
ApplicationLog, ExecutionLogonRequest, Log, Logon, TestExecutionResponse,
};
use tucana::sagittarius::{ExecutionLogonRequest, Logon, TestExecutionResponse};
use tucana::shared::{ExecutionFlow, ValidationFlow, Value};

pub struct SagittariusTestExecutionServiceClient {
Expand Down Expand Up @@ -99,8 +96,9 @@ impl SagittariusTestExecutionServiceClient {

let uuid = uuid::Uuid::new_v4().to_string();

// TODO: When triangulum is ready, validate the body with this service
// Task: Add back body validation using triangulum
// TODO: When the new validator is ready, the body needs to be validated at this
// point.

let execution_flow = ExecutionFlow {
flow_id: request.flow_id,
input_value: request.body,
Expand Down
Loading