Skip to content

Commit

Permalink
use different singers for each stream
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Feb 19, 2024
1 parent ba53db5 commit ee347a2
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 92 deletions.
69 changes: 33 additions & 36 deletions dashtool/src/build/build_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use iceberg_rust_spec::{
},
util::strip_prefix,
};
use serde_json::Value as JsonValue;
use serde_json::{Map, Value as JsonValue};

use crate::{
dag::{identifier::FullIdentifier, Dag, Node, Singer, Tabular},
Expand Down Expand Up @@ -185,33 +185,30 @@ pub(super) async fn build_dag<'repo>(
anyhow!("target.json must be inside a subfolder."),
))?;
let tap_path = parent_path.join("tap.json");
let tap_json = serde_json::from_str(&fs::read_to_string(&tap_path)?)?;
let tap_json: JsonValue =
serde_json::from_str(&fs::read_to_string(&tap_path)?)?;
let mut target_json: JsonValue =
serde_json::from_str(&fs::read_to_string(path)?)?;
target_json["branch"] = branch.to_string().into();
let identifier = parent_path
.to_str()
.ok_or(Error::Anyhow(anyhow!("Failed to convert path to string.")))?;

if let Some(merged_branch) = merged_branch {
let streams = if let JsonValue::Object(object) = &target_json["streams"]
{
let streams = if let JsonValue::Object(object) = &target_json["streams"] {
Ok(object)
} else {
Err(Error::Text(
"Streams in config must be an object.".to_string(),
))
}?;

for (stream, table_name) in streams.iter() {
let name = if let JsonValue::String(object) = table_name {
Ok(object)
} else {
Err(Error::Text(
"Streams in config must be an object.".to_string(),
))
}?;

for table in streams.values() {
let name = if let JsonValue::String(object) = table {
Ok(object)
} else {
Err(Error::Text(
"Streams in config must be an object.".to_string(),
))
}?;

if let Some(merged_branch) = merged_branch {
let identifier = FullIdentifier::parse(name)?;
let catalog_name = identifier.catalog_name();

Expand Down Expand Up @@ -252,16 +249,18 @@ pub(super) async fn build_dag<'repo>(
.commit()
.await?;
}
};

singer_sender
.send(Node::Singer(Singer::new(
identifier,
tap_json,
target_json,
branch,
)))
.await?;
let mut target_json = target_json.clone();
target_json["streams"] =
Map::from_iter(vec![(stream.clone(), table_name.clone())]).into();
singer_sender
.send(Node::Singer(Singer::new(
name,
tap_json.clone(),
target_json,
branch,
)))
.await?;
}
}
_ => (),
};
Expand Down Expand Up @@ -430,21 +429,20 @@ mod tests {
.expect("Failed to create plugin"),
);

dbg!(&dag);

build_dag(&mut dag, main_diff, plugin, "main", None)
.await
.expect("Failed to build dag");

assert_eq!(dag.singers.len(), 1);
dbg!(&dag);

assert_eq!(dag.map.len(), 1);

let orders = dag
.singers
let singer = &dag.dag[*dag
.map
.get("bronze.inventory.orders")
.expect("Failed to get singer");

assert_eq!(orders, "bronze/inventory");

let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")];
.expect("Failed to get graph index")];

let Node::Singer(singer) = singer else {
panic!("Node is not a singer")
Expand Down Expand Up @@ -658,7 +656,6 @@ mod tests {
.await
.expect("Failed to build dag");

assert_eq!(dag.singers.len(), 1);
assert_eq!(dag.map.len(), 2);

let tabular = &dag.dag[*dag
Expand Down
60 changes: 34 additions & 26 deletions dashtool/src/build/update_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{ffi::OsStr, fs, path::Path};
use anyhow::anyhow;
use git2::{Delta, Diff};
use iceberg_rust::sql::find_relations;
use serde_json::Value as JsonValue;
use serde_json::{Map, Value as JsonValue};

use crate::{
dag::{identifier::FullIdentifier, Dag, Node, Singer, Tabular},
Expand Down Expand Up @@ -42,18 +42,35 @@ pub(super) fn update_dag(diff: Diff<'_>, dag: Option<Dag>, branch: &str) -> Resu
"target.json must be inside a subfolder."
)))?;
let tap_path = parent_path.join("tap.json");
let tap_json = serde_json::from_str(&fs::read_to_string(&tap_path)?)?;
let tap_json: JsonValue = serde_json::from_str(&fs::read_to_string(&tap_path)?)?;
let mut target_json: JsonValue = serde_json::from_str(&fs::read_to_string(path)?)?;
target_json["branch"] = branch.to_string().into();
let identifier = parent_path
.to_str()
.ok_or(Error::Anyhow(anyhow!("Failed to convert path to string.")))?;
dag.add_node(Node::Singer(Singer::new(
identifier,
tap_json,
target_json,
branch,
)));
let streams = if let JsonValue::Object(object) = &target_json["streams"] {
Ok(object)
} else {
Err(Error::Text(
"Streams in config must be an object.".to_string(),
))
}?;

for (stream, table_name) in streams.iter() {
let name = if let JsonValue::String(object) = table_name {
Ok(object)
} else {
Err(Error::Text(
"Streams in config must be an object.".to_string(),
))
}?;
let mut target_json = target_json.clone();
target_json["streams"] =
Map::from_iter(vec![(stream.clone(), table_name.clone())]).into();
dag.add_node(Node::Singer(Singer::new(
name,
tap_json.clone(),
target_json,
branch,
)));
}
}

for path in tabulars {
Expand Down Expand Up @@ -158,16 +175,12 @@ mod tests {

let dag = update_dag(diff, None, "main").expect("Failed to create dag");

assert_eq!(dag.singers.len(), 1);
assert_eq!(dag.map.len(), 1);

let orders = dag
.singers
let singer = &dag.dag[*dag
.map
.get("bronze.inventory.orders")
.expect("Failed to get singer");
assert_eq!(orders, "bronze/inventory");

let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")];
.expect("Failed to get graph index")];

let Node::Singer(singer) = singer else {
panic!("Node is not a singer")
Expand Down Expand Up @@ -282,7 +295,6 @@ mod tests {

let dag = update_dag(diff, None, "main").expect("Failed to create dag");

assert_eq!(dag.singers.len(), 1);
assert_eq!(dag.map.len(), 2);

let tabular = &dag.dag[*dag
Expand Down Expand Up @@ -374,16 +386,12 @@ mod tests {

let dag = update_dag(diff, None, "expenditures").expect("Failed to create dag");

assert_eq!(dag.singers.len(), 1);
assert_eq!(dag.map.len(), 1);

let orders = dag
.singers
let singer = &dag.dag[*dag
.map
.get("bronze.inventory.orders")
.expect("Failed to get singer");
assert_eq!(orders, "bronze/inventory");

let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")];
.expect("Failed to get graph index")];

let Node::Singer(singer) = singer else {
panic!("Node is not a singer")
Expand Down
37 changes: 7 additions & 30 deletions dashtool/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,13 @@ impl Singer {

#[derive(Serialize, Deserialize, Debug)]
pub struct Dag {
pub(crate) singers: HashMap<String, String>,
pub(crate) map: HashMap<String, NodeIndex>,
pub(crate) dag: StableDiGraph<Node, ()>,
}

impl Dag {
pub fn new() -> Self {
Self {
singers: HashMap::new(),
map: HashMap::new(),
dag: StableDiGraph::new(),
}
Expand All @@ -81,21 +79,7 @@ impl Dag {

impl Dag {
pub(crate) fn add_node(&mut self, node: Node) {
let identifier = match &node {
Node::Singer(singer) => {
let identifier = singer.identifier.clone();
let streams: HashMap<String, String> =
serde_json::from_value(singer.target["streams"].clone())
.expect("target.json must contain streams field.");
for (_, stream) in &streams {
self.singers.insert(stream.clone(), identifier.clone());
}
identifier
}
Node::Tabular(tab) => tab.identifier.clone(),
};

match self.map.entry(identifier) {
match self.map.entry(node.identifier().to_string()) {
Entry::Vacant(entry) => {
let idx = self.dag.add_node(node);
entry.insert(idx);
Expand All @@ -113,18 +97,12 @@ impl Dag {
.get(a)
.cloned()
.ok_or(Error::Text("Node not in graph.".to_string()))?;
let b = match self.singers.get(b) {
None => self
.map
.get(b)
.cloned()
.ok_or(Error::Text("Node not in graph.".to_string()))?,
Some(ident) => self
.map
.get(ident)
.cloned()
.ok_or(Error::Text("Node not in graph.".to_string()))?,
};

let b = self
.map
.get(b)
.cloned()
.ok_or(Error::Text("Node not in graph.".to_string()))?;

self.dag.add_edge(b, a, ());
Ok(())
Expand All @@ -139,7 +117,6 @@ pub fn get_dag(branch: &str) -> Result<Dag, Error> {
dag
} else {
Dag {
singers: HashMap::new(),
map: HashMap::new(),
dag: StableDiGraph::new(),
}
Expand Down

0 comments on commit ee347a2

Please sign in to comment.