Skip to content

Commit

Permalink
use single node for singer
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Apr 19, 2024
1 parent 45f35b1 commit 4b1771c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 59 deletions.
42 changes: 23 additions & 19 deletions dashtool/src/build/build_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use iceberg_rust_spec::spec::{
snapshot::{SnapshotReference, SnapshotRetention},
view_metadata::{ViewProperties, REF_PREFIX},
};
use serde_json::{Map, Value as JsonValue};
use serde_json::Value as JsonValue;

use crate::{
dag::{identifier::FullIdentifier, Dag, Node, Singer, Tabular},
Expand Down Expand Up @@ -207,6 +207,8 @@ pub(super) async fn build_dag<'repo>(
.await?;
}
Some(false) => {
let identifier = FullIdentifier::parse_path(Path::new(&path))?.to_string();

let tap_path =
path.trim_end_matches("target.json").to_string() + "tap.json";
let tap_json: JsonValue =
Expand All @@ -223,7 +225,7 @@ pub(super) async fn build_dag<'repo>(
))
}?;

for (stream, stream_config) in streams.iter() {
for (_, stream_config) in streams.iter() {
let name = if let JsonValue::String(name) = &stream_config["identifier"]
{
Ok(name)
Expand Down Expand Up @@ -273,19 +275,15 @@ pub(super) async fn build_dag<'repo>(
.commit()
.await?;
}
let mut target_json = target_json.clone();
target_json["streams"] =
Map::from_iter(vec![(stream.clone(), stream_config.clone())])
.into();
singer_sender
.send(Node::Singer(Singer::new(
name,
tap_json.clone(),
target_json,
branch,
)))
.await?;
}
singer_sender
.send(Node::Singer(Singer::new(
&identifier,
tap_json.clone(),
target_json,
branch,
)))
.await?;
}
_ => (),
};
Expand All @@ -303,11 +301,11 @@ pub(super) async fn build_dag<'repo>(
HashMap::from_iter(tabular_reciever.collect::<Vec<_>>().await);

for singer in singers {
dag.add_node(singer)
dag.add_node(singer)?;
}

for (node, (sql, _)) in &tabs {
dag.add_node(Node::Tabular(Tabular::new(node, branch, sql)))
dag.add_node(Node::Tabular(Tabular::new(node, branch, sql)))?;
}

for (node, (_, children)) in tabs {
Expand Down Expand Up @@ -441,12 +439,17 @@ mod tests {
.await
.expect("Failed to build dag");

assert_eq!(dag.singers.len(), 1);
assert_eq!(dag.map.len(), 1);

let singer = &dag.dag[*dag
.map
let orders = dag
.singers
.get("bronze.inventory.orders")
.expect("Failed to get graph index")];
.expect("Failed to get graph index");

assert_eq!(orders, "bronze.inventory.target");

let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")];

let Node::Singer(singer) = singer else {
panic!("Node is not a singer")
Expand Down Expand Up @@ -650,6 +653,7 @@ mod tests {
.await
.expect("Failed to build dag");

assert_eq!(dag.singers.len(), 1);
assert_eq!(dag.map.len(), 2);

let tabular = &dag.dag[*dag
Expand Down
60 changes: 27 additions & 33 deletions dashtool/src/build/update_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fs, path::Path};

use gix::diff::tree::recorder::Change;
use iceberg_rust::sql::find_relations;
use serde_json::{Map, Value as JsonValue};
use serde_json::Value as JsonValue;

use crate::{
dag::{identifier::FullIdentifier, Dag, Node, Singer, Tabular},
Expand Down Expand Up @@ -34,34 +34,19 @@ pub(super) fn update_dag(diff: &[Change], dag: Option<Dag>, branch: &str) -> Res
}

for path in singers {
let identifier = FullIdentifier::parse_path(Path::new(&path))?.to_string();

let tap_path = path.trim_end_matches("target.json").to_string() + "tap.json";
let tap_json: JsonValue = serde_json::from_str(&fs::read_to_string(&tap_path)?)?;
let mut target_json: JsonValue = serde_json::from_str(&fs::read_to_string(path)?)?;
target_json["branch"] = branch.to_string().into();
let streams = if let JsonValue::Object(object) = &target_json["streams"] {
Ok(object)
} else {
Err(Error::Text(
"Streams in config must be an object.".to_string(),
))
}?;

for (stream, stream_config) in streams.iter() {
let name = if let JsonValue::String(name) = &stream_config["identifier"] {
Ok(name)
} else {
Err(Error::Text("Stream identifer is not a string.".to_string()))
}?;
let mut target_json = target_json.clone();
target_json["streams"] =
Map::from_iter(vec![(stream.clone(), stream_config.clone())]).into();
dag.add_node(Node::Singer(Singer::new(
name,
tap_json.clone(),
target_json,
branch,
)));
}

dag.add_node(Node::Singer(Singer::new(
&identifier,
tap_json.clone(),
target_json,
branch,
)))?;
}

for path in tabulars {
Expand All @@ -71,7 +56,7 @@ pub(super) fn update_dag(diff: &[Change], dag: Option<Dag>, branch: &str) -> Res

let identifier = FullIdentifier::parse_path(Path::new(&path))?.to_string();

dag.add_node(Node::Tabular(Tabular::new(&identifier, branch, &sql)));
dag.add_node(Node::Tabular(Tabular::new(&identifier, branch, &sql)))?;

for child in children {
dag.add_edge(&identifier, &child)?
Expand Down Expand Up @@ -159,12 +144,16 @@ mod tests {

let dag = update_dag(&changes, None, "main").expect("Failed to create dag");

assert_eq!(dag.map.len(), 1);
assert_eq!(dag.singers.len(), 1);

let singer = &dag.dag[*dag
.map
let orders = dag
.singers
.get("bronze.inventory.orders")
.expect("Failed to get graph index")];
.expect("Failed to get graph index");

assert_eq!(orders, "bronze.inventory.target");

let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")];

let Node::Singer(singer) = singer else {
panic!("Node is not a singer")
Expand Down Expand Up @@ -352,12 +341,17 @@ mod tests {

let dag = update_dag(&changes, None, "expenditures").expect("Failed to create dag");

assert_eq!(dag.singers.len(), 1);
assert_eq!(dag.map.len(), 1);

let singer = &dag.dag[*dag
.map
let orders = dag
.singers
.get("bronze.inventory.orders")
.expect("Failed to get graph index")];
.expect("Failed to get graph index");

assert_eq!(orders, "bronze.inventory.target");

let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")];

let Node::Singer(singer) = singer else {
panic!("Node is not a singer")
Expand Down
44 changes: 37 additions & 7 deletions dashtool/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
fs,
};

use anyhow::anyhow;
use petgraph::stable_graph::{NodeIndex, StableDiGraph};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
Expand Down Expand Up @@ -64,22 +65,42 @@ impl Singer {

#[derive(Serialize, Deserialize, Debug)]
pub struct Dag {
pub(crate) singers: HashMap<String, String>,
pub(crate) map: HashMap<String, NodeIndex>,
pub(crate) dag: StableDiGraph<Node, ()>,
}

impl Dag {
pub fn new() -> Self {
Self {
singers: HashMap::new(),
map: HashMap::new(),
dag: StableDiGraph::new(),
}
}
}

impl Dag {
pub(crate) fn add_node(&mut self, node: Node) {
match self.map.entry(node.identifier().to_string()) {
pub(crate) fn add_node(&mut self, node: Node) -> Result<(), Error> {
let identifier = match &node {
Node::Singer(singer) => {
let identifier = singer.identifier.clone();
let streams: HashMap<String, JsonValue> =
serde_json::from_value(singer.target["streams"].clone())
.expect("target.json must contain streams field.");
for (_, config) in &streams {
let stream = if let JsonValue::String(stream) = &config["identifier"] {
Ok(stream)
} else {
Err(Error::Anyhow(anyhow!("Stream must have an identifier")))
}?;
self.singers.insert(stream.clone(), identifier.clone());
}
identifier
}
Node::Tabular(tab) => tab.identifier.clone(),
};
match self.map.entry(identifier) {
Entry::Vacant(entry) => {
let idx = self.dag.add_node(node);
entry.insert(idx);
Expand All @@ -89,6 +110,7 @@ impl Dag {
self.dag[*idx] = node;
}
};
Ok(())
}

pub(crate) fn add_edge(&mut self, a: &str, b: &str) -> Result<(), Error> {
Expand All @@ -98,11 +120,18 @@ impl Dag {
.cloned()
.ok_or(Error::Text("Node not in graph.".to_string()))?;

let b = self
.map
.get(b)
.cloned()
.ok_or(Error::Text("Node not in graph.".to_string()))?;
let b = match self.singers.get(b) {
None => self
.map
.get(b)
.cloned()
.ok_or(Error::Text("Node not in graph.".to_string()))?,
Some(ident) => self
.map
.get(ident)
.cloned()
.ok_or(Error::Text("Node not in graph.".to_string()))?,
};

self.dag.add_edge(b, a, ());
Ok(())
Expand All @@ -117,6 +146,7 @@ pub fn get_dag(branch: &str) -> Result<Dag, Error> {
dag
} else {
Dag {
singers: HashMap::new(),
map: HashMap::new(),
dag: StableDiGraph::new(),
}
Expand Down

0 comments on commit 4b1771c

Please sign in to comment.