Skip to content

Commit

Permalink
update dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Feb 28, 2024
1 parent 3bc14be commit 4c366c6
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 35 deletions.
13 changes: 6 additions & 7 deletions Documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ kubectl apply -f argo/workflow.yaml
## Configuration

Dashtool uses the `dashtool.json` file to store connection and authentication parameters for the current project.
The configuration file has two sections, one for the Iceberg catalog and one for the cloud provider.

### Catalog

The catalog section of the configuration file contains parameters related to the Iceberg catalog.
The field "catalog" defines which catalog to use.
It uses a plugin system to support different Icebergs catalogs and cloud providers. The "plugin" field specifies which plugin to use.

| Field | Type | Description |
| --- | --- | --- |
| **catalog** | String | Name of the catalog. Can be: "sql" |
| **plugin** | String | Name of the plugin. Can be: "sql" |

### Sql plugin

The configuration file for the Sql plugin has two sections, one for the Iceberg catalog and one for the cloud provider.

#### Sql catalog

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ kubectl apply -f argo/workflow.yaml

## Installation

### Homebrew

### Cargo
69 changes: 42 additions & 27 deletions dashtool/src/build/build_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ use iceberg_rust::{
catalog::tabular::Tabular as IcebergTabular, error::Error as IcebergError,
materialized_view::materialized_view_builder::MaterializedViewBuilder, sql::find_relations,
};
use iceberg_rust_spec::spec::{
schema::SchemaBuilder,
snapshot::{SnapshotReference, SnapshotRetention},
view_metadata::{ViewProperties, REF_PREFIX},
use iceberg_rust_spec::{
spec::{
schema::SchemaBuilder,
snapshot::{SnapshotReference, SnapshotRetention},
table_metadata::new_metadata_location,
view_metadata::{ViewProperties, REF_PREFIX},
},
util::strip_prefix,
};
use serde_json::{Map, Value as JsonValue};

Expand Down Expand Up @@ -69,7 +73,8 @@ pub(super) async fn build_dag<'repo>(

match (delta.status(), merged_branch) {
(Delta::Added | Delta::Modified, Some(merged_branch)) => {
let tabular = catalog.load_table(&identifier.identifier()?).await?;
let tabular =
catalog.load_tabular(&identifier.identifier()?).await?;
let mut matview =
if let IcebergTabular::MaterializedView(matview) = tabular {
Ok(matview)
Expand All @@ -82,30 +87,37 @@ pub(super) async fn build_dag<'repo>(
let version_id = matview.metadata().current_version_id;
let mut storage_table = matview.storage_table().await?;
let snapshot_id = *storage_table
.metadata()
.table_metadata
.current_snapshot(Some(merged_branch))?
.ok_or(Error::Iceberg(IcebergError::NotFound(
"Snapshot id".to_string(),
"branch".to_string() + merged_branch,
)))?
.snapshot_id();
storage_table
.new_transaction(Some(merged_branch))
.set_snapshot_ref((
branch.to_string(),
SnapshotReference {
snapshot_id,
retention: SnapshotRetention::default(),
},
))
.commit()
storage_table.table_metadata.refs.insert(
branch.to_string(),
SnapshotReference {
snapshot_id,
retention: SnapshotRetention::default(),
},
);
let metaddata_location =
new_metadata_location(matview.metadata().into());
matview
.object_store()
.put(
&strip_prefix(&metaddata_location).as_str().into(),
serde_json::to_string(&storage_table.table_metadata)?
.into(),
)
.await?;
matview
.new_transaction(Some(merged_branch))
.update_properties(vec![(
REF_PREFIX.to_string() + branch,
version_id.to_string(),
)])
.update_materialization(&metaddata_location)
.commit()
.await?;
}
Expand Down Expand Up @@ -155,7 +167,7 @@ pub(super) async fn build_dag<'repo>(
)?;
builder.location(&base_path);
builder.properties(ViewProperties {
storage_table: Default::default(),
metadata_location: Default::default(),
other: HashMap::from_iter(vec![(
REF_PREFIX.to_string() + branch,
1.to_string(),
Expand Down Expand Up @@ -208,7 +220,8 @@ pub(super) async fn build_dag<'repo>(
),
)?;

let tabular = catalog.load_table(&identifier.identifier()?).await?;
let tabular =
catalog.load_tabular(&identifier.identifier()?).await?;

let mut table = if let IcebergTabular::Table(table) = tabular {
Ok(table)
Expand Down Expand Up @@ -310,10 +323,7 @@ mod tests {
use crate::{
build::{build_dag::build_dag, update_dag::update_dag},
dag::Node,
plugins::{
sql::{SqlConfig, SqlPlugin},
Plugin,
},
plugins::{sql::SqlPlugin, Config, Plugin},
test::repo_init,
};

Expand Down Expand Up @@ -407,8 +417,9 @@ mod tests {
}
"#;

let config: SqlConfig =
serde_json::from_str(&config_json).expect("Failed to parse sql config");
let config = match serde_json::from_str(&config_json).expect("Failed to parse sql config") {
Config::Sql(config) => config,
};

let plugin = Arc::new(
SqlPlugin::new(config)
Expand Down Expand Up @@ -620,16 +631,20 @@ mod tests {

builder.build().await.expect("Failed to create table.");

let config: SqlConfig = serde_json::from_str(
let config = match serde_json::from_str(
r#"
{
"plugin": "sql
"catalogUrl": "sqlite://",
"bucket": "test",
"secrets": {}
}
"#,
)
.expect("Failed to parse config");
.expect("Failed to parse sql config")
{
Config::Sql(config) => config,
};

let plugin = Arc::new(
SqlPlugin::new_with_catalog(config, catalog_list).expect("Failed to create plugin"),
Expand Down Expand Up @@ -663,7 +678,7 @@ mod tests {
.expect("Failed to get catalog");

let matview = if let Tabular::MaterializedView(matview) = catalog
.load_table(
.load_tabular(
&Identifier::parse("inventory.factOrder").expect("Failed to parse identifier"),
)
.await
Expand Down
2 changes: 1 addition & 1 deletion dashtool/src/plugins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub trait Plugin: Debug {
}

#[derive(Serialize, Deserialize)]
#[serde(tag = "catalog", rename_all = "lowercase")]
#[serde(tag = "plugin", rename_all = "lowercase")]
pub enum Config {
Sql(SqlConfig),
}

0 comments on commit 4c366c6

Please sign in to comment.