From 0f8686f30b7b78f70d6586fb6109f09afbd7fd1c Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Fri, 26 Apr 2024 08:01:12 +0200 Subject: [PATCH] use single file for singers --- dashtool/src/build/build_dag.rs | 144 +++++++++------------ dashtool/src/build/update_dag.rs | 208 ++++++++++++------------------- dashtool/src/dag/identifier.rs | 2 +- dashtool/src/dag/mod.rs | 17 ++- 4 files changed, 157 insertions(+), 214 deletions(-) diff --git a/dashtool/src/build/build_dag.rs b/dashtool/src/build/build_dag.rs index 3ea5770..e9ea8f3 100644 --- a/dashtool/src/build/build_dag.rs +++ b/dashtool/src/build/build_dag.rs @@ -15,7 +15,7 @@ use iceberg_rust_spec::spec::{ use serde_json::Value as JsonValue; use crate::{ - dag::{identifier::FullIdentifier, Dag, Node, Singer, Tabular}, + dag::{identifier::FullIdentifier, Dag, Node, Singer, SingerConfig, Tabular}, error::Error, plugins::Plugin, }; @@ -64,7 +64,7 @@ pub(super) async fn build_dag<'repo>( .to_string(); let is_tabular = if path.ends_with(".sql") { Some(true) - } else if path.ends_with("target.json") { + } else if path.ends_with(".singer.json") { Some(false) } else { None @@ -209,12 +209,13 @@ pub(super) async fn build_dag<'repo>( Some(false) => { let identifier = FullIdentifier::parse_path(Path::new(&path))?.to_string(); - let tap_path = - path.trim_end_matches("target.json").to_string() + "tap.json"; - let tap_json: JsonValue = - serde_json::from_str(&fs::read_to_string(&tap_path)?)?; - let mut target_json: JsonValue = - serde_json::from_str(&fs::read_to_string(path)?)?; + let singer_json: SingerConfig = + serde_json::from_str(&fs::read_to_string(&path)?)?; + let tap_json = singer_json.tap; + let mut target_json = singer_json.target; + + let image = singer_json.image; + target_json["branch"] = branch.to_string().into(); let streams = if let JsonValue::Object(object) = &target_json["streams"] { @@ -279,6 +280,7 @@ pub(super) async fn build_dag<'repo>( singer_sender .send(Node::Singer(Singer::new( &identifier, + &image, tap_json.clone(), target_json, branch, @@ -360,59 +362,42 @@ mod tests { let bronze_inventory_path = bronze_path.join("inventory"); fs::create_dir(&bronze_inventory_path).expect("Failed to create directory"); - let tap_path = bronze_inventory_path.join(Path::new("tap.json")); - File::create(&tap_path) - .expect("Failed to create file") - .write_all( - r#" - { - "host": "172.17.0.2", - "port": 5432, - "user": "postgres", - "password": "$POSTGRES_PASSWORD", - "dbname": "postgres", - "filter_schemas": "inventory", - "default_replication_method": "LOG_BASED" - } - "# - .as_bytes(), - ) - .expect("Failed to write to file"); - - let target_path = bronze_inventory_path.join(Path::new("target.json")); - File::create(&target_path) + let config_path = bronze_inventory_path.join(Path::new("postgres.singer.json")); + File::create(&config_path) .expect("Failed to create file") .write_all( r#" { - "image": "dashbook/pipelinewise-tap-postgres:iceberg", - "streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }}, - "catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj", - "bucket": "s3://example-postgres/", - "access_token": "$ACCESS_TOKEN", - "id_token": "$ID_TOKEN" + "image":"dashbook/pipelinewise-tap-postgres:sql", + "tap":{ + "host": "172.17.0.2", + "port": 5432, + "user": "postgres", + "password": "$POSTGRES_PASSWORD", + "dbname": "postgres", + "filter_schemas": "inventory", + "default_replication_method": "LOG_BASED" + }, + "target":{ + "streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }}, + "catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj", + "bucket": "s3://example-postgres/", + "access_token": "$ACCESS_TOKEN", + "id_token": "$ID_TOKEN" + } } "# .as_bytes(), ) .expect("Failed to write to file"); - let changes = vec![ - Change::Addition { - entry_mode: EntryKind::Tree - .try_into() - .expect("Failed to create git entry"), - oid: ObjectId::null(gix::hash::Kind::Sha1), - path: tap_path.to_str().unwrap().into(), - }, - Change::Addition { - entry_mode: EntryKind::Tree - .try_into() - .expect("Failed to create git entry"), - oid: ObjectId::null(gix::hash::Kind::Sha1), - path: target_path.to_str().unwrap().into(), - }, - ]; + let changes = vec![Change::Addition { + entry_mode: EntryKind::Tree + .try_into() + .expect("Failed to create git entry"), + oid: ObjectId::null(gix::hash::Kind::Sha1), + path: config_path.to_str().unwrap().into(), + }]; let mut dag = update_dag(&vec![], None, "main").expect("Failed to create dag"); @@ -447,7 +432,7 @@ mod tests { .get("bronze.inventory.orders") .expect("Failed to get graph index"); - assert_eq!(orders, "bronze.inventory.target"); + assert_eq!(orders, "bronze.inventory.postgres"); let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")]; @@ -475,37 +460,29 @@ mod tests { let bronze_inventory_path = bronze_path.join("inventory"); fs::create_dir(&bronze_inventory_path).expect("Failed to create directory"); - let tap_path = bronze_inventory_path.join(Path::new("tap.json")); - File::create(&tap_path) - .expect("Failed to create file") - .write_all( - r#" - { - "host": "172.17.0.2", - "port": 5432, - "user": "postgres", - "password": "$POSTGRES_PASSWORD", - "dbname": "postgres", - "filter_schemas": "inventory", - "default_replication_method": "LOG_BASED" - } - "# - .as_bytes(), - ) - .expect("Failed to write to file"); - - let target_path = bronze_inventory_path.join(Path::new("target.json")); - File::create(&target_path) + let config_path = bronze_inventory_path.join(Path::new("postgres.singer.json")); + File::create(&config_path) .expect("Failed to create file") .write_all( r#" { - "image": "dashbook/pipelinewise-tap-postgres:iceberg", - "streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }}, - "catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj", - "bucket": "s3://example-postgres/", - "access_token": "$ACCESS_TOKEN", - "id_token": "$ID_TOKEN" + "image":"dashbook/pipelinewise-tap-postgres:sql", + "tap":{ + "host": "172.17.0.2", + "port": 5432, + "user": "postgres", + "password": "$POSTGRES_PASSWORD", + "dbname": "postgres", + "filter_schemas": "inventory", + "default_replication_method": "LOG_BASED" + }, + "target":{ + "streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }}, + "catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj", + "bucket": "s3://example-postgres/", + "access_token": "$ACCESS_TOKEN", + "id_token": "$ID_TOKEN" + } } "# .as_bytes(), @@ -535,14 +512,7 @@ mod tests { .try_into() .expect("Failed to create git entry"), oid: ObjectId::null(gix::hash::Kind::Sha1), - path: tap_path.to_str().unwrap().into(), - }, - Change::Addition { - entry_mode: EntryKind::Tree - .try_into() - .expect("Failed to create git entry"), - oid: ObjectId::null(gix::hash::Kind::Sha1), - path: target_path.to_str().unwrap().into(), + path: config_path.to_str().unwrap().into(), }, Change::Addition { entry_mode: EntryKind::Tree diff --git a/dashtool/src/build/update_dag.rs b/dashtool/src/build/update_dag.rs index b93a311..284bbe6 100644 --- a/dashtool/src/build/update_dag.rs +++ b/dashtool/src/build/update_dag.rs @@ -2,10 +2,9 @@ use std::{fs, path::Path}; use gix::diff::tree::recorder::Change; use iceberg_rust::sql::find_relations; -use serde_json::Value as JsonValue; use crate::{ - dag::{identifier::FullIdentifier, Dag, Node, Singer, Tabular}, + dag::{identifier::FullIdentifier, Dag, Node, Singer, SingerConfig, Tabular}, error::Error, }; @@ -25,7 +24,7 @@ pub(super) fn update_dag(diff: &[Change], dag: Option, branch: &str) -> Res } => { if path.ends_with(b".sql") { tabulars.push(String::from_utf8(path.as_slice().to_owned())?) - } else if path.ends_with(b"target.json") { + } else if path.ends_with(b".singer.json") { singers.push(String::from_utf8(path.as_slice().to_owned())?) }; } @@ -36,13 +35,17 @@ pub(super) fn update_dag(diff: &[Change], dag: Option, branch: &str) -> Res for path in singers { let identifier = FullIdentifier::parse_path(Path::new(&path))?.to_string(); - let tap_path = path.trim_end_matches("target.json").to_string() + "tap.json"; - let tap_json: JsonValue = serde_json::from_str(&fs::read_to_string(&tap_path)?)?; - let mut target_json: JsonValue = serde_json::from_str(&fs::read_to_string(path)?)?; + let singer_json: SingerConfig = serde_json::from_str(&fs::read_to_string(&path)?)?; + let tap_json = singer_json.tap; + let mut target_json = singer_json.target; + + let image = singer_json.image; + target_json["branch"] = branch.to_string().into(); dag.add_node(Node::Singer(Singer::new( &identifier, + &image, tap_json.clone(), target_json, branch, @@ -92,55 +95,42 @@ mod tests { let bronze_inventory_path = bronze_path.join("inventory"); fs::create_dir(&bronze_inventory_path).expect("Failed to create directory"); - let tap_path = bronze_inventory_path.join(Path::new("tap.json")); - File::create(&tap_path) + let config_path = bronze_inventory_path.join(Path::new("postgres.singer.json")); + File::create(&config_path) .expect("Failed to create file") .write_all( r#" { - "host": "172.17.0.2", - "port": 5432, - "user": "postgres", - "password": "$POSTGRES_PASSWORD", - "dbname": "postgres", - "filter_schemas": "inventory", - "default_replication_method": "LOG_BASED" + "image":"dashbook/pipelinewise-tap-postgres:sql", + "tap":{ + "host": "172.17.0.2", + "port": 5432, + "user": "postgres", + "password": "$POSTGRES_PASSWORD", + "dbname": "postgres", + "filter_schemas": "inventory", + "default_replication_method": "LOG_BASED" + }, + "target":{ + "streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }}, + "catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj", + "bucket": "s3://example-postgres/", + "access_token": "$ACCESS_TOKEN", + "id_token": "$ID_TOKEN" + } } "# .as_bytes(), ) .expect("Failed to write to file"); - let target_path = bronze_inventory_path.join(Path::new("target.json")); - File::create(&target_path) - .expect("Failed to create file") - .write_all( - r#" - { - "image": "dashbook/pipelinewise-tap-postgres:iceberg", - "streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }} - } - "# - .as_bytes(), - ) - .expect("Failed to write to file"); - - let changes = vec![ - Change::Addition { - entry_mode: EntryKind::Tree - .try_into() - .expect("Failed to create git entry"), - oid: ObjectId::null(gix::hash::Kind::Sha1), - path: tap_path.to_str().unwrap().into(), - }, - Change::Addition { - entry_mode: EntryKind::Tree - .try_into() - .expect("Failed to create git entry"), - oid: ObjectId::null(gix::hash::Kind::Sha1), - path: target_path.to_str().unwrap().into(), - }, - ]; + let changes = vec![Change::Addition { + entry_mode: EntryKind::Tree + .try_into() + .expect("Failed to create git entry"), + oid: ObjectId::null(gix::hash::Kind::Sha1), + path: config_path.to_str().unwrap().into(), + }]; let dag = update_dag(&changes, None, "main").expect("Failed to create dag"); @@ -151,7 +141,7 @@ mod tests { .get("bronze.inventory.orders") .expect("Failed to get graph index"); - assert_eq!(orders, "bronze.inventory.target"); + assert_eq!(orders, "bronze.inventory.postgres"); let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")]; @@ -178,37 +168,29 @@ mod tests { let bronze_inventory_path = bronze_path.join("inventory"); fs::create_dir(&bronze_inventory_path).expect("Failed to create directory"); - let tap_path = bronze_inventory_path.join(Path::new("tap.json")); - File::create(&tap_path) - .expect("Failed to create file") - .write_all( - r#" - { - "host": "172.17.0.2", - "port": 5432, - "user": "postgres", - "password": "$POSTGRES_PASSWORD", - "dbname": "postgres", - "filter_schemas": "inventory", - "default_replication_method": "LOG_BASED" - } - "# - .as_bytes(), - ) - .expect("Failed to write to file"); - - let target_path = bronze_inventory_path.join(Path::new("target.json")); - File::create(&target_path) + let config_path = bronze_inventory_path.join(Path::new("postgres.singer.json")); + File::create(&config_path) .expect("Failed to create file") .write_all( r#" { - "image": "dashbook/pipelinewise-tap-postgres:iceberg", - "streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }}, - "catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj", - "bucket": "s3://example-postgres/", - "access_token": "$ACCESS_TOKEN", - "id_token": "$ID_TOKEN" + "image":"dashbook/pipelinewise-tap-postgres:sql", + "tap":{ + "host": "172.17.0.2", + "port": 5432, + "user": "postgres", + "password": "$POSTGRES_PASSWORD", + "dbname": "postgres", + "filter_schemas": "inventory", + "default_replication_method": "LOG_BASED" + }, + "target":{ + "streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }}, + "catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj", + "bucket": "s3://example-postgres/", + "access_token": "$ACCESS_TOKEN", + "id_token": "$ID_TOKEN" + } } "# .as_bytes(), @@ -238,14 +220,7 @@ mod tests { .try_into() .expect("Failed to create git entry"), oid: ObjectId::null(gix::hash::Kind::Sha1), - path: tap_path.to_str().unwrap().into(), - }, - Change::Addition { - entry_mode: EntryKind::Tree - .try_into() - .expect("Failed to create git entry"), - oid: ObjectId::null(gix::hash::Kind::Sha1), - path: target_path.to_str().unwrap().into(), + path: config_path.to_str().unwrap().into(), }, Change::Addition { entry_mode: EntryKind::Tree @@ -285,59 +260,42 @@ mod tests { let bronze_inventory_path = bronze_path.join("inventory"); fs::create_dir(&bronze_inventory_path).expect("Failed to create directory"); - let tap_path = bronze_inventory_path.join(Path::new("tap.json")); - File::create(&tap_path) + let config_path = bronze_inventory_path.join(Path::new("postgres.singer.json")); + File::create(&config_path) .expect("Failed to create file") .write_all( r#" { - "host": "172.17.0.2", - "port": 5432, - "user": "postgres", - "password": "$POSTGRES_PASSWORD", - "dbname": "postgres", - "filter_schemas": "inventory", - "default_replication_method": "LOG_BASED" + "image":"dashbook/pipelinewise-tap-postgres:sql", + "tap":{ + "host": "172.17.0.2", + "port": 5432, + "user": "postgres", + "password": "$POSTGRES_PASSWORD", + "dbname": "postgres", + "filter_schemas": "inventory", + "default_replication_method": "LOG_BASED" + }, + "target":{ + "streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }}, + "catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj", + "bucket": "s3://example-postgres/", + "access_token": "$ACCESS_TOKEN", + "id_token": "$ID_TOKEN" + } } "# .as_bytes(), ) .expect("Failed to write to file"); - let target_path = bronze_inventory_path.join(Path::new("target.json")); - File::create(&target_path) - .expect("Failed to create file") - .write_all( - r#" - { - "image": "dashbook/pipelinewise-tap-postgres:iceberg", - "streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }}, - "catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj", - "bucket": "s3://example-postgres/", - "access_token": "$ACCESS_TOKEN", - "id_token": "$ID_TOKEN" - } - "# - .as_bytes(), - ) - .expect("Failed to write to file"); - - let changes = vec![ - Change::Addition { - entry_mode: EntryKind::Tree - .try_into() - .expect("Failed to create git entry"), - oid: ObjectId::null(gix::hash::Kind::Sha1), - path: tap_path.to_str().unwrap().into(), - }, - Change::Addition { - entry_mode: EntryKind::Tree - .try_into() - .expect("Failed to create git entry"), - oid: ObjectId::null(gix::hash::Kind::Sha1), - path: target_path.to_str().unwrap().into(), - }, - ]; + let changes = vec![Change::Addition { + entry_mode: EntryKind::Tree + .try_into() + .expect("Failed to create git entry"), + oid: ObjectId::null(gix::hash::Kind::Sha1), + path: config_path.to_str().unwrap().into(), + }]; let dag = update_dag(&changes, None, "expenditures").expect("Failed to create dag"); @@ -349,7 +307,7 @@ mod tests { .get("bronze.inventory.orders") .expect("Failed to get graph index"); - assert_eq!(orders, "bronze.inventory.target"); + assert_eq!(orders, "bronze.inventory.postgres"); let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")]; diff --git a/dashtool/src/dag/identifier.rs b/dashtool/src/dag/identifier.rs index 072c373..59112e8 100644 --- a/dashtool/src/dag/identifier.rs +++ b/dashtool/src/dag/identifier.rs @@ -51,7 +51,7 @@ impl FullIdentifier { .to_str() .ok_or(Error::Text("Failed to convert OsStr".to_string()))? .trim_end_matches(".sql") - .trim_end_matches(".json") + .trim_end_matches(".singer.json") .to_owned(); let namespace_name = parts .next() diff --git a/dashtool/src/dag/mod.rs b/dashtool/src/dag/mod.rs index 9a4be19..e6ea01a 100644 --- a/dashtool/src/dag/mod.rs +++ b/dashtool/src/dag/mod.rs @@ -48,21 +48,36 @@ impl Tabular { pub struct Singer { pub(crate) identifier: String, pub(crate) branch: String, + pub(crate) image: String, pub(crate) tap: JsonValue, pub(crate) target: JsonValue, } impl Singer { - pub(crate) fn new(identifier: &str, tap: JsonValue, target: JsonValue, branch: &str) -> Self { + pub(crate) fn new( + identifier: &str, + image: &str, + tap: JsonValue, + target: JsonValue, + branch: &str, + ) -> Self { Self { identifier: identifier.to_owned(), branch: branch.to_owned(), + image: image.to_owned(), tap, target, } } } +#[derive(Serialize, Deserialize, Debug)] +pub(crate) struct SingerConfig { + pub(crate) image: String, + pub(crate) tap: JsonValue, + pub(crate) target: JsonValue, +} + #[derive(Serialize, Deserialize, Debug)] pub struct Dag { pub(crate) singers: HashMap,