Skip to content

Commit

Permalink
update iceberg-rust
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Mar 20, 2024
1 parent b2ee08a commit dfac553
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 32 deletions.
10 changes: 5 additions & 5 deletions dashtool/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dashtool"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

description = "Lakehouse build tool"
Expand All @@ -13,12 +13,12 @@ repository = "https://github.com/dashbook/dashtool"
argo-workflow = "0.1"
clap = { version = "=4.4", features = ["derive"] }
dashtool-common = "0.1"
datafusion-iceberg-sql = "0.2"
datafusion-iceberg-sql = "0.3"
datafusion-sql = "36.0.0"
datafusion-common = "36.0.0"
iceberg-rust = "0.2"
iceberg-rust-spec = "0.2"
iceberg-sql-catalog = "0.2"
iceberg-rust = "0.3"
iceberg-rust-spec = "0.3"
iceberg-sql-catalog = "0.3"
serde = { workspace = true }
serde_json = { workspace = true }
sqlparser = { version = "0.43", features = ["visitor"] }
Expand Down
43 changes: 16 additions & 27 deletions dashtool/src/build/build_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@ use iceberg_rust::{
catalog::tabular::Tabular as IcebergTabular, error::Error as IcebergError,
materialized_view::materialized_view_builder::MaterializedViewBuilder, sql::find_relations,
};
use iceberg_rust_spec::{
spec::{
schema::SchemaBuilder,
snapshot::{SnapshotReference, SnapshotRetention},
table_metadata::new_metadata_location,
view_metadata::{ViewProperties, REF_PREFIX},
},
util::strip_prefix,
use iceberg_rust_spec::spec::{
schema::SchemaBuilder,
snapshot::{SnapshotReference, SnapshotRetention},
view_metadata::{ViewProperties, REF_PREFIX},
};
use serde_json::{Map, Value as JsonValue};

Expand Down Expand Up @@ -117,37 +113,30 @@ pub(super) async fn build_dag<'repo>(
let version_id = matview.metadata().current_version_id;
let mut storage_table = matview.storage_table().await?;
let snapshot_id = *storage_table
.table_metadata
.metadata()
.current_snapshot(Some(merged_branch))?
.ok_or(Error::Iceberg(IcebergError::NotFound(
"Snapshot id".to_string(),
"branch".to_string() + merged_branch,
)))?
.snapshot_id();
storage_table.table_metadata.refs.insert(
branch.to_string(),
SnapshotReference {
snapshot_id,
retention: SnapshotRetention::default(),
},
);
let metaddata_location =
new_metadata_location(matview.metadata().into());
matview
.object_store()
.put(
&strip_prefix(&metaddata_location).as_str().into(),
serde_json::to_string(&storage_table.table_metadata)?
.into(),
)
storage_table
.new_transaction(Some(merged_branch))
.set_snapshot_ref((
branch.to_string(),
SnapshotReference {
snapshot_id,
retention: SnapshotRetention::default(),
},
))
.commit()
.await?;
matview
.new_transaction(Some(merged_branch))
.update_properties(vec![(
REF_PREFIX.to_string() + branch,
version_id.to_string(),
)])
.update_materialization(&metaddata_location)
.commit()
.await?;
}
Expand Down Expand Up @@ -203,7 +192,7 @@ pub(super) async fn build_dag<'repo>(
)?;
builder.location(&base_path);
builder.properties(ViewProperties {
metadata_location: Default::default(),
storage_table: identifier.to_string() + "__storage",
other: HashMap::from_iter(vec![(
REF_PREFIX.to_string() + branch,
1.to_string(),
Expand Down

0 comments on commit dfac553

Please sign in to comment.