diff --git a/Cargo.lock b/Cargo.lock index 828eab0..2f2c136 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -300,6 +300,7 @@ dependencies = [ "clap", "colored", "futures", + "log", "notify", "prost", "reqwest", @@ -307,6 +308,7 @@ dependencies = [ "serde_json", "tabled", "tokio", + "tonic", "tucana", "zip", ] diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 2673c3e..1af2ce3 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -21,4 +21,6 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] } futures = { workspace = true } zip = { workspace = true } bytes = { workspace = true } -prost = { workspace = true } \ No newline at end of file +prost = { workspace = true } +tonic = "0.14.2" +log = "0.4.28" \ No newline at end of file diff --git a/crates/cli/src/analyser/flow_type.rs b/crates/cli/src/analyser/flow_type.rs index 1ecb51a..beb8feb 100644 --- a/crates/cli/src/analyser/flow_type.rs +++ b/crates/cli/src/analyser/flow_type.rs @@ -36,27 +36,27 @@ impl Analyser { )); } - if let Some(identifier) = &flow.input_type_identifier { - if !self.data_type_identifier_exists(identifier, None) { - self.reporter.add(Diagnose::new( - name.clone(), - original.clone(), - DiagnosticKind::UndefinedDataTypeIdentifier { - identifier: identifier.clone(), - }, - )); - } + if let Some(identifier) = &flow.input_type_identifier + && !self.data_type_identifier_exists(identifier, None) + { + self.reporter.add(Diagnose::new( + name.clone(), + original.clone(), + DiagnosticKind::UndefinedDataTypeIdentifier { + identifier: identifier.clone(), + }, + )); } - if let Some(identifier) = &flow.return_type_identifier { - if !self.data_type_identifier_exists(identifier, None) { - self.reporter.add(Diagnose::new( - name.clone(), - original.clone(), - DiagnosticKind::UndefinedDataTypeIdentifier { - identifier: identifier.clone(), - }, - )); - } + if let Some(identifier) = &flow.return_type_identifier + && !self.data_type_identifier_exists(identifier, None) + { + self.reporter.add(Diagnose::new( + name.clone(), + original.clone(), + DiagnosticKind::UndefinedDataTypeIdentifier { + identifier: identifier.clone(), + }, + )); } for setting in &flow.settings { diff --git a/crates/cli/src/command/mod.rs b/crates/cli/src/command/mod.rs index 351e876..76ca057 100644 --- a/crates/cli/src/command/mod.rs +++ b/crates/cli/src/command/mod.rs @@ -1,5 +1,6 @@ -pub mod definition; pub mod download; pub mod feature; +pub mod push; pub mod report; +pub mod search; pub mod watch; diff --git a/crates/cli/src/command/push/auth.rs b/crates/cli/src/command/push/auth.rs new file mode 100644 index 0000000..434ed58 --- /dev/null +++ b/crates/cli/src/command/push/auth.rs @@ -0,0 +1,15 @@ +use std::str::FromStr; +use tonic::metadata::{MetadataMap, MetadataValue}; + +pub fn get_authorization_metadata(token: &str) -> MetadataMap { + let metadata_value = MetadataValue::from_str(token).unwrap_or_else(|error| { + panic!( + "An error occurred trying to convert runtime_token into metadata: {}", + error + ); + }); + + let mut map = MetadataMap::new(); + map.insert("authorization", metadata_value); + map +} diff --git a/crates/cli/src/command/push/data_type_client_impl.rs b/crates/cli/src/command/push/data_type_client_impl.rs new file mode 100644 index 0000000..1dc8a4a --- /dev/null +++ b/crates/cli/src/command/push/data_type_client_impl.rs @@ -0,0 +1,49 @@ +use crate::command::push::auth::get_authorization_metadata; +use tonic::{Extensions, Request, transport::Channel}; +use tucana::sagittarius::{ + DataTypeUpdateRequest as SagittariusDataTypeUpdateRequest, + data_type_service_client::DataTypeServiceClient, +}; +use tucana::shared::DefinitionDataType; + +pub struct SagittariusDataTypeServiceClient { + client: DataTypeServiceClient, + token: String, +} + +impl SagittariusDataTypeServiceClient { + pub async fn new(sagittarius_url: String, token: String) -> Self { + let client = match DataTypeServiceClient::connect(sagittarius_url).await { + Ok(client) => { + log::info!("Successfully connected to Sagittarius DataType Endpoint!"); + client + } + Err(err) => panic!( + "Failed to connect to Sagittarius (DataType Endpoint): {:?}", + err + ), + }; + + Self { client, token } + } + + pub async fn update_data_types(&mut self, data_types: Vec) { + let request = Request::from_parts( + get_authorization_metadata(&self.token), + Extensions::new(), + SagittariusDataTypeUpdateRequest { data_types }, + ); + + match self.client.update(request).await { + Ok(response) => { + log::info!( + "Successfully transferred data types. Did Sagittarius updated them? {:?}", + &response + ); + } + Err(err) => { + log::error!("Failed to update DataTypes: {:?}", err); + } + }; + } +} diff --git a/crates/cli/src/command/push/flow_type_client_impl.rs b/crates/cli/src/command/push/flow_type_client_impl.rs new file mode 100644 index 0000000..84d2ae2 --- /dev/null +++ b/crates/cli/src/command/push/flow_type_client_impl.rs @@ -0,0 +1,49 @@ +use crate::command::push::auth::get_authorization_metadata; +use tonic::Extensions; +use tonic::Request; +use tonic::transport::Channel; +use tucana::sagittarius::FlowTypeUpdateRequest as SagittariusFlowTypeUpdateRequest; +use tucana::sagittarius::flow_type_service_client::FlowTypeServiceClient; +use tucana::shared::FlowType; + +pub struct SagittariusFlowTypeServiceClient { + client: FlowTypeServiceClient, + token: String, +} + +impl SagittariusFlowTypeServiceClient { + pub async fn new(sagittarius_url: String, token: String) -> Self { + let client = match FlowTypeServiceClient::connect(sagittarius_url).await { + Ok(client) => { + log::info!("Successfully connected to Sagittarius FlowType Endpoint!"); + client + } + Err(err) => panic!( + "Failed to connect to Sagittarius (FlowType Endpoint): {:?}", + err + ), + }; + + Self { client, token } + } + + pub async fn update_flow_types(&mut self, flow_types: Vec) { + let request = Request::from_parts( + get_authorization_metadata(&self.token), + Extensions::new(), + SagittariusFlowTypeUpdateRequest { flow_types }, + ); + + match self.client.update(request).await { + Ok(response) => { + log::info!( + "Successfully transferred FlowTypes. Did Sagittarius updated them? {:?}", + &response + ); + } + Err(err) => { + log::error!("Failed to update FlowTypes: {:?}", err); + } + }; + } +} diff --git a/crates/cli/src/command/push/function_client_impl.rs b/crates/cli/src/command/push/function_client_impl.rs new file mode 100644 index 0000000..bb29d20 --- /dev/null +++ b/crates/cli/src/command/push/function_client_impl.rs @@ -0,0 +1,52 @@ +use crate::command::push::auth::get_authorization_metadata; +use tonic::Extensions; +use tonic::Request; +use tonic::transport::Channel; +use tucana::sagittarius::RuntimeFunctionDefinitionUpdateRequest as SagittariusRuntimeFunctionUpdateRequest; +use tucana::sagittarius::runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient; +use tucana::shared::RuntimeFunctionDefinition; + +pub struct SagittariusRuntimeFunctionServiceClient { + client: RuntimeFunctionDefinitionServiceClient, + token: String, +} + +impl SagittariusRuntimeFunctionServiceClient { + pub async fn new(sagittarius_url: String, token: String) -> Self { + let client = match RuntimeFunctionDefinitionServiceClient::connect(sagittarius_url).await { + Ok(client) => { + log::info!("Successfully connected to Sagittarius RuntimeFunction Endpoint!"); + client + } + Err(err) => panic!( + "Failed to connect to Sagittarius (RuntimeFunction Endpoint): {:?}", + err + ), + }; + + Self { client, token } + } + + pub async fn update_runtime_function_definitions( + &mut self, + runtime_functions: Vec, + ) { + let request = Request::from_parts( + get_authorization_metadata(&self.token), + Extensions::new(), + SagittariusRuntimeFunctionUpdateRequest { runtime_functions }, + ); + + match self.client.update(request).await { + Ok(response) => { + log::info!( + "Successfully transferred RuntimeFunctions. Did Sagittarius updated them? {:?}", + &response + ); + } + Err(err) => { + log::error!("Failed to update RuntimeFunctions: {:?}", err); + } + }; + } +} diff --git a/crates/cli/src/command/push/mod.rs b/crates/cli/src/command/push/mod.rs new file mode 100644 index 0000000..778e7ed --- /dev/null +++ b/crates/cli/src/command/push/mod.rs @@ -0,0 +1,137 @@ +use crate::analyser::core::Analyser; +use crate::command::push::data_type_client_impl::SagittariusDataTypeServiceClient; +use crate::command::push::flow_type_client_impl::SagittariusFlowTypeServiceClient; +use crate::command::push::function_client_impl::SagittariusRuntimeFunctionServiceClient; +use crate::formatter::{default, info}; +use notify::event::ModifyKind; +use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher}; +use std::sync::mpsc::channel; +use std::time::{Duration, Instant}; + +mod auth; +mod data_type_client_impl; +mod flow_type_client_impl; +mod function_client_impl; + +pub async fn push(token: String, url: String, path: Option) { + let dir_path = path.unwrap_or_else(|| "./definitions".to_string()); + + info(format!("Watching directory: {dir_path}")); + info(String::from("Press Ctrl+C to stop watching...")); + + { + Analyser::new(dir_path.as_str()).report(false); + } + + // Set up file watcher + let (tx, rx) = channel(); + let mut watcher = recommended_watcher(tx).unwrap(); + watcher + .watch(std::path::Path::new(&dir_path), RecursiveMode::Recursive) + .unwrap(); + + let mut last_run = Instant::now(); + + let mut data_type_client = + SagittariusDataTypeServiceClient::new(url.clone(), token.clone()).await; + let mut flow_type_client = + SagittariusFlowTypeServiceClient::new(url.clone(), token.clone()).await; + let mut function_client = SagittariusRuntimeFunctionServiceClient::new(url, token).await; + + loop { + if let Ok(Ok(event)) = rx.recv() { + match event.kind { + EventKind::Modify(modify) => { + if let ModifyKind::Data(_) = modify + && last_run.elapsed() > Duration::from_millis(500) + { + default(String::from( + "\n\n\n--------------------------------------------------------------------------\n\n", + )); + info(String::from("Change detected! Regenerating report...")); + let mut analyzer = Analyser::new(dir_path.as_str()); + + // No errors when reporter is empty! + if analyzer.reporter.is_empty() { + data_type_client + .update_data_types( + analyzer + .data_types + .iter() + .map(|d| d.definition_data_type.clone()) + .collect(), + ) + .await; + flow_type_client + .update_flow_types( + analyzer + .flow_types + .iter() + .map(|d| d.flow_type.clone()) + .collect(), + ) + .await; + function_client + .update_runtime_function_definitions( + analyzer + .functions + .iter() + .map(|d| d.function.clone()) + .collect(), + ) + .await; + } + + analyzer.report(false); + + last_run = Instant::now(); + } + } + EventKind::Remove(_) => { + if last_run.elapsed() > Duration::from_millis(500) { + default(String::from( + "\n\n\n--------------------------------------------------------------------------\n\n", + )); + info(String::from("Change detected! Regenerating report...")); + let mut analyzer = Analyser::new(dir_path.as_str()); + + // No errors when reporter is empty! + if analyzer.reporter.is_empty() { + data_type_client + .update_data_types( + analyzer + .data_types + .iter() + .map(|d| d.definition_data_type.clone()) + .collect(), + ) + .await; + flow_type_client + .update_flow_types( + analyzer + .flow_types + .iter() + .map(|d| d.flow_type.clone()) + .collect(), + ) + .await; + function_client + .update_runtime_function_definitions( + analyzer + .functions + .iter() + .map(|d| d.function.clone()) + .collect(), + ) + .await; + } + + analyzer.report(false); + last_run = Instant::now(); + } + } + _ => {} + } + } + } +} diff --git a/crates/cli/src/command/definition.rs b/crates/cli/src/command/search.rs similarity index 88% rename from crates/cli/src/command/definition.rs rename to crates/cli/src/command/search.rs index 1b7d77b..223a1ab 100644 --- a/crates/cli/src/command/definition.rs +++ b/crates/cli/src/command/search.rs @@ -35,7 +35,7 @@ fn search_and_display_definitions(search_name: &str, parser: &Parser) { let mut index = 0; for line in json.lines() { index += 1; - println!("{} {}", format!("{index}:"), line.bright_cyan()); + println!("{}: {}", index, line.bright_cyan()); } } Err(_) => println!("{}", "Error serializing FlowType".red()), @@ -57,7 +57,7 @@ fn search_and_display_definitions(search_name: &str, parser: &Parser) { let mut index = 0; for line in json.lines() { index += 1; - println!("{} {}", format!("{index}:"), line.bright_cyan()); + println!("{}: {}", index, line.bright_cyan()); } } Err(_) => println!("{}", "Error serializing DataType".red()), @@ -79,7 +79,7 @@ fn search_and_display_definitions(search_name: &str, parser: &Parser) { let mut index = 0; for line in json.lines() { index += 1; - println!("{} {}", format!("{index}:"), line.bright_cyan()); + println!("{}: {}", index, line.bright_cyan()); } } Err(_) => println!("{}", "Error serializing RuntimeFunction".red()), @@ -89,10 +89,7 @@ fn search_and_display_definitions(search_name: &str, parser: &Parser) { } if !found_any { - println!( - "{}", - format!("\n{}: {}", "error".red(), "Found no matching definition(s)") - ); + println!("\n{}: Found no matching definition(s)", "error".red(),); } else { success(format!("Found {total_matches} matching definition(s)")) } diff --git a/crates/cli/src/formatter.rs b/crates/cli/src/formatter.rs index 29d22e2..03aab8c 100644 --- a/crates/cli/src/formatter.rs +++ b/crates/cli/src/formatter.rs @@ -46,6 +46,6 @@ where Table::new(iter).with(Style::rounded()).to_string() } -fn print_path(path: &String) -> String { +fn print_path(path: &str) -> String { format!("\n --> {}", &path.underline()).blue().to_string() } diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index dafe930..94e44c2 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -36,7 +36,7 @@ enum Commands { path: Option, }, /// Look up a specific definition. - Definition { + Search { /// Required name of the definition. #[arg(short, long)] name: String, @@ -50,6 +50,17 @@ enum Commands { #[arg(short, long)] path: Option, }, + Push { + /// Runtime Token for Sagittarius. + #[arg(short, long)] + token: String, + /// URL to Sagittarius. + #[arg(short, long)] + url: String, + /// Optional path to root directory of all definitions. + #[arg(short, long)] + path: Option, + }, Download { #[arg(short, long)] tag: Option, @@ -65,10 +76,11 @@ async fn main() { match cli.command { Commands::Report { path } => command::report::report_errors(path), Commands::Feature { name, path } => command::feature::search_feature(name, path), - Commands::Definition { name, path } => command::definition::search_definition(name, path), + Commands::Search { name, path } => command::search::search_definition(name, path), Commands::Download { tag, features } => { command::download::handle_download(tag, features).await } Commands::Watch { path } => command::watch::watch_for_changes(path).await, + Commands::Push { token, url, path } => command::push::push(token, url, path).await, } } diff --git a/crates/cli/src/table.rs b/crates/cli/src/table.rs index 2a1763d..09a28d9 100644 --- a/crates/cli/src/table.rs +++ b/crates/cli/src/table.rs @@ -76,7 +76,7 @@ pub fn feature_table( (flow_type_rows, data_type_rows, runtime_function_rows) } -pub fn summary_table(features: &Vec) -> Vec { +pub fn summary_table(features: &[Feature]) -> Vec { features .iter() .map(|feature| FeatureSummaryRow {