diff --git a/dashtool/src/build/build_dag.rs b/dashtool/src/build/build_dag.rs index 029c47e..3ea5770 100644 --- a/dashtool/src/build/build_dag.rs +++ b/dashtool/src/build/build_dag.rs @@ -12,7 +12,7 @@ use iceberg_rust_spec::spec::{ snapshot::{SnapshotReference, SnapshotRetention}, view_metadata::{ViewProperties, REF_PREFIX}, }; -use serde_json::{Map, Value as JsonValue}; +use serde_json::Value as JsonValue; use crate::{ dag::{identifier::FullIdentifier, Dag, Node, Singer, Tabular}, @@ -207,6 +207,8 @@ pub(super) async fn build_dag<'repo>( .await?; } Some(false) => { + let identifier = FullIdentifier::parse_path(Path::new(&path))?.to_string(); + let tap_path = path.trim_end_matches("target.json").to_string() + "tap.json"; let tap_json: JsonValue = @@ -223,7 +225,7 @@ pub(super) async fn build_dag<'repo>( )) }?; - for (stream, stream_config) in streams.iter() { + for (_, stream_config) in streams.iter() { let name = if let JsonValue::String(name) = &stream_config["identifier"] { Ok(name) @@ -273,19 +275,15 @@ pub(super) async fn build_dag<'repo>( .commit() .await?; } - let mut target_json = target_json.clone(); - target_json["streams"] = - Map::from_iter(vec![(stream.clone(), stream_config.clone())]) - .into(); - singer_sender - .send(Node::Singer(Singer::new( - name, - tap_json.clone(), - target_json, - branch, - ))) - .await?; } + singer_sender + .send(Node::Singer(Singer::new( + &identifier, + tap_json.clone(), + target_json, + branch, + ))) + .await?; } _ => (), }; @@ -303,11 +301,11 @@ pub(super) async fn build_dag<'repo>( HashMap::from_iter(tabular_reciever.collect::>().await); for singer in singers { - dag.add_node(singer) + dag.add_node(singer)?; } for (node, (sql, _)) in &tabs { - dag.add_node(Node::Tabular(Tabular::new(node, branch, sql))) + dag.add_node(Node::Tabular(Tabular::new(node, branch, sql)))?; } for (node, (_, children)) in tabs { @@ -441,12 +439,17 @@ mod tests { .await .expect("Failed to build dag"); + assert_eq!(dag.singers.len(), 1); assert_eq!(dag.map.len(), 1); - let singer = &dag.dag[*dag - .map + let orders = dag + .singers .get("bronze.inventory.orders") - .expect("Failed to get graph index")]; + .expect("Failed to get graph index"); + + assert_eq!(orders, "bronze.inventory.target"); + + let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")]; let Node::Singer(singer) = singer else { panic!("Node is not a singer") @@ -650,6 +653,7 @@ mod tests { .await .expect("Failed to build dag"); + assert_eq!(dag.singers.len(), 1); assert_eq!(dag.map.len(), 2); let tabular = &dag.dag[*dag diff --git a/dashtool/src/build/update_dag.rs b/dashtool/src/build/update_dag.rs index 4cca009..b93a311 100644 --- a/dashtool/src/build/update_dag.rs +++ b/dashtool/src/build/update_dag.rs @@ -2,7 +2,7 @@ use std::{fs, path::Path}; use gix::diff::tree::recorder::Change; use iceberg_rust::sql::find_relations; -use serde_json::{Map, Value as JsonValue}; +use serde_json::Value as JsonValue; use crate::{ dag::{identifier::FullIdentifier, Dag, Node, Singer, Tabular}, @@ -34,34 +34,19 @@ pub(super) fn update_dag(diff: &[Change], dag: Option, branch: &str) -> Res } for path in singers { + let identifier = FullIdentifier::parse_path(Path::new(&path))?.to_string(); + let tap_path = path.trim_end_matches("target.json").to_string() + "tap.json"; let tap_json: JsonValue = serde_json::from_str(&fs::read_to_string(&tap_path)?)?; let mut target_json: JsonValue = serde_json::from_str(&fs::read_to_string(path)?)?; target_json["branch"] = branch.to_string().into(); - let streams = if let JsonValue::Object(object) = &target_json["streams"] { - Ok(object) - } else { - Err(Error::Text( - "Streams in config must be an object.".to_string(), - )) - }?; - - for (stream, stream_config) in streams.iter() { - let name = if let JsonValue::String(name) = &stream_config["identifier"] { - Ok(name) - } else { - Err(Error::Text("Stream identifer is not a string.".to_string())) - }?; - let mut target_json = target_json.clone(); - target_json["streams"] = - Map::from_iter(vec![(stream.clone(), stream_config.clone())]).into(); - dag.add_node(Node::Singer(Singer::new( - name, - tap_json.clone(), - target_json, - branch, - ))); - } + + dag.add_node(Node::Singer(Singer::new( + &identifier, + tap_json.clone(), + target_json, + branch, + )))?; } for path in tabulars { @@ -71,7 +56,7 @@ pub(super) fn update_dag(diff: &[Change], dag: Option, branch: &str) -> Res let identifier = FullIdentifier::parse_path(Path::new(&path))?.to_string(); - dag.add_node(Node::Tabular(Tabular::new(&identifier, branch, &sql))); + dag.add_node(Node::Tabular(Tabular::new(&identifier, branch, &sql)))?; for child in children { dag.add_edge(&identifier, &child)? @@ -159,12 +144,16 @@ mod tests { let dag = update_dag(&changes, None, "main").expect("Failed to create dag"); - assert_eq!(dag.map.len(), 1); + assert_eq!(dag.singers.len(), 1); - let singer = &dag.dag[*dag - .map + let orders = dag + .singers .get("bronze.inventory.orders") - .expect("Failed to get graph index")]; + .expect("Failed to get graph index"); + + assert_eq!(orders, "bronze.inventory.target"); + + let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")]; let Node::Singer(singer) = singer else { panic!("Node is not a singer") @@ -352,12 +341,17 @@ mod tests { let dag = update_dag(&changes, None, "expenditures").expect("Failed to create dag"); + assert_eq!(dag.singers.len(), 1); assert_eq!(dag.map.len(), 1); - let singer = &dag.dag[*dag - .map + let orders = dag + .singers .get("bronze.inventory.orders") - .expect("Failed to get graph index")]; + .expect("Failed to get graph index"); + + assert_eq!(orders, "bronze.inventory.target"); + + let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")]; let Node::Singer(singer) = singer else { panic!("Node is not a singer") diff --git a/dashtool/src/dag/mod.rs b/dashtool/src/dag/mod.rs index e5f9efc..9a4be19 100644 --- a/dashtool/src/dag/mod.rs +++ b/dashtool/src/dag/mod.rs @@ -3,6 +3,7 @@ use std::{ fs, }; +use anyhow::anyhow; use petgraph::stable_graph::{NodeIndex, StableDiGraph}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; @@ -64,6 +65,7 @@ impl Singer { #[derive(Serialize, Deserialize, Debug)] pub struct Dag { + pub(crate) singers: HashMap, pub(crate) map: HashMap, pub(crate) dag: StableDiGraph, } @@ -71,6 +73,7 @@ pub struct Dag { impl Dag { pub fn new() -> Self { Self { + singers: HashMap::new(), map: HashMap::new(), dag: StableDiGraph::new(), } @@ -78,8 +81,26 @@ impl Dag { } impl Dag { - pub(crate) fn add_node(&mut self, node: Node) { - match self.map.entry(node.identifier().to_string()) { + pub(crate) fn add_node(&mut self, node: Node) -> Result<(), Error> { + let identifier = match &node { + Node::Singer(singer) => { + let identifier = singer.identifier.clone(); + let streams: HashMap = + serde_json::from_value(singer.target["streams"].clone()) + .expect("target.json must contain streams field."); + for (_, config) in &streams { + let stream = if let JsonValue::String(stream) = &config["identifier"] { + Ok(stream) + } else { + Err(Error::Anyhow(anyhow!("Stream must have an identifier"))) + }?; + self.singers.insert(stream.clone(), identifier.clone()); + } + identifier + } + Node::Tabular(tab) => tab.identifier.clone(), + }; + match self.map.entry(identifier) { Entry::Vacant(entry) => { let idx = self.dag.add_node(node); entry.insert(idx); @@ -89,6 +110,7 @@ impl Dag { self.dag[*idx] = node; } }; + Ok(()) } pub(crate) fn add_edge(&mut self, a: &str, b: &str) -> Result<(), Error> { @@ -98,11 +120,18 @@ impl Dag { .cloned() .ok_or(Error::Text("Node not in graph.".to_string()))?; - let b = self - .map - .get(b) - .cloned() - .ok_or(Error::Text("Node not in graph.".to_string()))?; + let b = match self.singers.get(b) { + None => self + .map + .get(b) + .cloned() + .ok_or(Error::Text("Node not in graph.".to_string()))?, + Some(ident) => self + .map + .get(ident) + .cloned() + .ok_or(Error::Text("Node not in graph.".to_string()))?, + }; self.dag.add_edge(b, a, ()); Ok(()) @@ -117,6 +146,7 @@ pub fn get_dag(branch: &str) -> Result { dag } else { Dag { + singers: HashMap::new(), map: HashMap::new(), dag: StableDiGraph::new(), }