Skip to content

Commit

Permalink
use single file for singer definition
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Apr 26, 2024
1 parent 355fad9 commit a8179ec
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 226 deletions.
149 changes: 58 additions & 91 deletions dashtool/src/build/build_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use iceberg_rust_spec::spec::{
use serde_json::Value as JsonValue;

use crate::{
dag::{identifier::FullIdentifier, Dag, Node, Singer, Tabular},
dag::{identifier::FullIdentifier, Dag, Node, Singer, SingerConfig, Tabular},
error::Error,
plugins::Plugin,
};
Expand Down Expand Up @@ -64,7 +64,7 @@ pub(super) async fn build_dag<'repo>(
.to_string();
let is_tabular = if path.ends_with(".sql") {
Some(true)
} else if path.ends_with("target.json") {
} else if path.ends_with(".singer.json") {
Some(false)
} else {
None
Expand Down Expand Up @@ -209,12 +209,13 @@ pub(super) async fn build_dag<'repo>(
Some(false) => {
let identifier = FullIdentifier::parse_path(Path::new(&path))?.to_string();

let tap_path =
path.trim_end_matches("target.json").to_string() + "tap.json";
let tap_json: JsonValue =
serde_json::from_str(&fs::read_to_string(&tap_path)?)?;
let mut target_json: JsonValue =
serde_json::from_str(&fs::read_to_string(path)?)?;
let singer_json: SingerConfig =
serde_json::from_str(&fs::read_to_string(&path)?)?;
let tap_json = singer_json.tap;
let mut target_json = singer_json.target;

let image = singer_json.image;

target_json["branch"] = branch.to_string().into();

let streams = if let JsonValue::Object(object) = &target_json["streams"] {
Expand Down Expand Up @@ -279,6 +280,7 @@ pub(super) async fn build_dag<'repo>(
singer_sender
.send(Node::Singer(Singer::new(
&identifier,
&image,
tap_json.clone(),
target_json,
branch,
Expand Down Expand Up @@ -360,59 +362,42 @@ mod tests {
let bronze_inventory_path = bronze_path.join("inventory");
fs::create_dir(&bronze_inventory_path).expect("Failed to create directory");

let tap_path = bronze_inventory_path.join(Path::new("tap.json"));
File::create(&tap_path)
.expect("Failed to create file")
.write_all(
r#"
{
"host": "172.17.0.2",
"port": 5432,
"user": "postgres",
"password": "$POSTGRES_PASSWORD",
"dbname": "postgres",
"filter_schemas": "inventory",
"default_replication_method": "LOG_BASED"
}
"#
.as_bytes(),
)
.expect("Failed to write to file");

let target_path = bronze_inventory_path.join(Path::new("target.json"));
File::create(&target_path)
let config_path = bronze_inventory_path.join(Path::new("postgres.singer.json"));
File::create(&config_path)
.expect("Failed to create file")
.write_all(
r#"
{
"image": "dashbook/pipelinewise-tap-postgres:iceberg",
"streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }},
"catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj",
"bucket": "s3://example-postgres/",
"access_token": "$ACCESS_TOKEN",
"id_token": "$ID_TOKEN"
"image":"dashbook/pipelinewise-tap-postgres:sql",
"tap":{
"host": "172.17.0.2",
"port": 5432,
"user": "postgres",
"password": "$POSTGRES_PASSWORD",
"dbname": "postgres",
"filter_schemas": "inventory",
"default_replication_method": "LOG_BASED"
},
"target":{
"streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }},
"catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj",
"bucket": "s3://example-postgres/",
"access_token": "$ACCESS_TOKEN",
"id_token": "$ID_TOKEN"
}
}
"#
.as_bytes(),
)
.expect("Failed to write to file");

let changes = vec![
Change::Addition {
entry_mode: EntryKind::Tree
.try_into()
.expect("Failed to create git entry"),
oid: ObjectId::null(gix::hash::Kind::Sha1),
path: tap_path.to_str().unwrap().into(),
},
Change::Addition {
entry_mode: EntryKind::Tree
.try_into()
.expect("Failed to create git entry"),
oid: ObjectId::null(gix::hash::Kind::Sha1),
path: target_path.to_str().unwrap().into(),
},
];
let changes = vec![Change::Addition {
entry_mode: EntryKind::Tree
.try_into()
.expect("Failed to create git entry"),
oid: ObjectId::null(gix::hash::Kind::Sha1),
path: config_path.to_str().unwrap().into(),
}];

let mut dag = update_dag(&vec![], None, "main").expect("Failed to create dag");

Expand Down Expand Up @@ -447,18 +432,15 @@ mod tests {
.get("bronze.inventory.orders")
.expect("Failed to get graph index");

assert_eq!(orders, "bronze.inventory.target");
assert_eq!(orders, "bronze.inventory.postgres");

let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")];

let Node::Singer(singer) = singer else {
panic!("Node is not a singer")
};

assert_eq!(
singer.target["image"],
"dashbook/pipelinewise-tap-postgres:iceberg"
);
assert_eq!(singer.image, "dashbook/pipelinewise-tap-postgres:sql");
assert_eq!(singer.target["branch"], "main");
}

Expand All @@ -475,37 +457,29 @@ mod tests {
let bronze_inventory_path = bronze_path.join("inventory");
fs::create_dir(&bronze_inventory_path).expect("Failed to create directory");

let tap_path = bronze_inventory_path.join(Path::new("tap.json"));
File::create(&tap_path)
let config_path = bronze_inventory_path.join(Path::new("postgres.singer.json"));
File::create(&config_path)
.expect("Failed to create file")
.write_all(
r#"
{
"host": "172.17.0.2",
"port": 5432,
"user": "postgres",
"password": "$POSTGRES_PASSWORD",
"dbname": "postgres",
"filter_schemas": "inventory",
"default_replication_method": "LOG_BASED"
}
"#
.as_bytes(),
)
.expect("Failed to write to file");

let target_path = bronze_inventory_path.join(Path::new("target.json"));
File::create(&target_path)
.expect("Failed to create file")
.write_all(
r#"
{
"image": "dashbook/pipelinewise-tap-postgres:iceberg",
"streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }},
"catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj",
"bucket": "s3://example-postgres/",
"access_token": "$ACCESS_TOKEN",
"id_token": "$ID_TOKEN"
"image":"dashbook/pipelinewise-tap-postgres:sql",
"tap":{
"host": "172.17.0.2",
"port": 5432,
"user": "postgres",
"password": "$POSTGRES_PASSWORD",
"dbname": "postgres",
"filter_schemas": "inventory",
"default_replication_method": "LOG_BASED"
},
"target":{
"streams": {"inventory-orders": { "identifier": "bronze.inventory.orders" }},
"catalog": "https://api.dashbook.dev/nessie/cat-1w0qookj",
"bucket": "s3://example-postgres/",
"access_token": "$ACCESS_TOKEN",
"id_token": "$ID_TOKEN"
}
}
"#
.as_bytes(),
Expand Down Expand Up @@ -535,14 +509,7 @@ mod tests {
.try_into()
.expect("Failed to create git entry"),
oid: ObjectId::null(gix::hash::Kind::Sha1),
path: tap_path.to_str().unwrap().into(),
},
Change::Addition {
entry_mode: EntryKind::Tree
.try_into()
.expect("Failed to create git entry"),
oid: ObjectId::null(gix::hash::Kind::Sha1),
path: target_path.to_str().unwrap().into(),
path: config_path.to_str().unwrap().into(),
},
Change::Addition {
entry_mode: EntryKind::Tree
Expand Down
Loading

0 comments on commit a8179ec

Please sign in to comment.