Skip to content

Commit

Permalink
update iceberg-rust
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Jan 24, 2024
1 parent 54304af commit 84cc0ce
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 59 deletions.
1 change: 1 addition & 0 deletions dashtool-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct S3Config {
pub aws_region: String,
pub aws_secret_access_key: Option<String>,
pub aws_endpoint: Option<String>,
pub aws_allow_http: Option<String>,
}

impl From<Option<ObjectStoreConfigSerde>> for ObjectStoreConfig {
Expand Down
59 changes: 28 additions & 31 deletions dashtool/src/build/build_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use iceberg_rust::{
use iceberg_rust_spec::{
spec::{
schema::Schema,
snapshot::{Reference, Retention},
snapshot::{SnapshotReference, SnapshotRetention},
view_metadata::REF_PREFIX,
},
util::strip_prefix,
Expand Down Expand Up @@ -50,7 +50,7 @@ pub(super) async fn build_dag<'repo>(
.new_file()
.path()
.ok_or(Error::Text("No new file in delta".to_string()))?;
let is_tabular = if path.extension() == Some(&OsStr::new("sql")) {
let is_tabular = if path.extension() == Some(OsStr::new("sql")) {
Some(true)
} else if path.ends_with("target.json") {
Some(false)
Expand All @@ -59,12 +59,12 @@ pub(super) async fn build_dag<'repo>(
};
match is_tabular {
Some(true) => {
let identifier = FullIdentifier::parse_path(&path)?;
let identifier = FullIdentifier::parse_path(path)?;

let catalog_name = identifier.catalog_name();

let catalog = catalog_list.catalog(catalog_name).await.ok_or(
IcebergError::NotFound(format!("Catalog"), catalog_name.to_string()),
IcebergError::NotFound("Catalog".to_string(), catalog_name.to_string()),
)?;

let sql = fs::read_to_string(path)?;
Expand Down Expand Up @@ -105,9 +105,9 @@ pub(super) async fn build_dag<'repo>(
.new_transaction(Some(merged_branch))
.set_ref((
branch.to_string(),
Reference {
SnapshotReference {
snapshot_id,
retention: Retention::default(),
retention: SnapshotRetention::default(),
},
))
.commit()
Expand All @@ -117,12 +117,12 @@ pub(super) async fn build_dag<'repo>(
let relations = relations
.iter()
.map(|x| {
FullIdentifier::parse(x).and_then(|y| {
Ok((
FullIdentifier::parse(x).map(|y| {
(
y.catalog_name().clone(),
y.namespace_name().clone(),
y.table_name().clone(),
))
)
})
})
.collect::<Result<Vec<_>, _>>()?;
Expand All @@ -131,7 +131,7 @@ pub(super) async fn build_dag<'repo>(
&sql,
&relations,
catalog_list.clone(),
Some(&branch),
Some(branch),
)
.await?;

Expand Down Expand Up @@ -188,24 +188,26 @@ pub(super) async fn build_dag<'repo>(
{
Ok(object)
} else {
Err(Error::Text(format!("Streams in config must be an object.")))
Err(Error::Text(
"Streams in config must be an object.".to_string(),
))
}?;

for table in streams.values() {
let name = if let JsonValue::String(object) = table {
Ok(object)
} else {
Err(Error::Text(format!(
"Streams in config must be an object."
)))
Err(Error::Text(
"Streams in config must be an object.".to_string(),
))
}?;

let identifier = FullIdentifier::parse(name)?;
let catalog_name = identifier.catalog_name();

let catalog = catalog_list.catalog(catalog_name).await.ok_or(
IcebergError::NotFound(
format!("Catalog"),
"Catalog".to_string(),
catalog_name.to_string(),
),
)?;
Expand All @@ -232,9 +234,9 @@ pub(super) async fn build_dag<'repo>(
.new_transaction(Some(merged_branch))
.set_ref((
branch.to_string(),
Reference {
SnapshotReference {
snapshot_id,
retention: Retention::default(),
retention: SnapshotRetention::default(),
},
))
.commit()
Expand Down Expand Up @@ -372,15 +374,15 @@ mod tests {
let mut index = repo.index().expect("Failed to create index");
index
.add_path(
&tap_path
tap_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
)
.expect("Failed to add path to index");
index
.add_path(
&target_path
target_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
Expand Down Expand Up @@ -432,11 +434,7 @@ mod tests {

assert_eq!(orders, "bronze/inventory");

let singer = &dag.dag[dag
.map
.get(orders)
.expect("Failed to get graph index")
.clone()];
let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")];

let Node::Singer(singer) = singer else {
panic!("Node is not a singer")
Expand Down Expand Up @@ -505,15 +503,15 @@ mod tests {
let mut index = repo.index().expect("Failed to create index");
index
.add_path(
&tap_path
tap_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
)
.expect("Failed to add path to index");
index
.add_path(
&target_path
target_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
Expand All @@ -539,7 +537,7 @@ mod tests {

index
.add_path(
&tabular_path
tabular_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
Expand Down Expand Up @@ -612,7 +610,7 @@ mod tests {
.unwrap(),
};
let partition_spec = PartitionSpecBuilder::default()
.spec_id(1)
.with_spec_id(1)
.with_partition_field(PartitionField {
source_id: 4,
field_id: 1000,
Expand Down Expand Up @@ -655,11 +653,10 @@ mod tests {
assert_eq!(dag.singers.len(), 1);
assert_eq!(dag.map.len(), 2);

let tabular = &dag.dag[dag
let tabular = &dag.dag[*dag
.map
.get("silver.inventory.factOrder")
.expect("Failed to get graph index")
.clone()];
.expect("Failed to get graph index")];

let Node::Tabular(tabular) = tabular else {
panic!("Node is not a singer")
Expand Down
41 changes: 14 additions & 27 deletions dashtool/src/build/update_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ use crate::{
};

// Converts the commits into a dag without performing any operations on the tables
pub(super) fn update_dag<'repo>(
diff: Diff<'repo>,
dag: Option<Dag>,
branch: &str,
) -> Result<Dag, Error> {
pub(super) fn update_dag(diff: Diff<'_>, dag: Option<Dag>, branch: &str) -> Result<Dag, Error> {
let mut dag = dag.unwrap_or(Dag::new());

let mut singers = Vec::new();
Expand All @@ -26,7 +22,7 @@ pub(super) fn update_dag<'repo>(
.new_file()
.path()
.ok_or(Error::Text("No new file in delta".to_string()))?;
let is_tabular = if path.extension() == Some(&OsStr::new("sql")) {
let is_tabular = if path.extension() == Some(OsStr::new("sql")) {
Some(true)
} else if path.ends_with("target.json") {
Some(false)
Expand Down Expand Up @@ -65,7 +61,7 @@ pub(super) fn update_dag<'repo>(

let children = find_relations(&sql)?;

let identifier = FullIdentifier::parse_path(&path)?.to_string();
let identifier = FullIdentifier::parse_path(path)?.to_string();

dag.add_node(Node::Tabular(Tabular::new(&identifier, branch, &sql)));

Expand Down Expand Up @@ -141,15 +137,15 @@ mod tests {
let mut index = repo.index().expect("Failed to create index");
index
.add_path(
&tap_path
tap_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
)
.expect("Failed to add path to index");
index
.add_path(
&target_path
target_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
Expand All @@ -171,11 +167,7 @@ mod tests {
.expect("Failed to get singer");
assert_eq!(orders, "bronze/inventory");

let singer = &dag.dag[dag
.map
.get(orders)
.expect("Failed to get graph index")
.clone()];
let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")];

let Node::Singer(singer) = singer else {
panic!("Node is not a singer")
Expand Down Expand Up @@ -243,15 +235,15 @@ mod tests {
let mut index = repo.index().expect("Failed to create index");
index
.add_path(
&tap_path
tap_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
)
.expect("Failed to add path to index");
index
.add_path(
&target_path
target_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
Expand All @@ -277,7 +269,7 @@ mod tests {

index
.add_path(
&tabular_path
tabular_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
Expand All @@ -293,11 +285,10 @@ mod tests {
assert_eq!(dag.singers.len(), 1);
assert_eq!(dag.map.len(), 2);

let tabular = &dag.dag[dag
let tabular = &dag.dag[*dag
.map
.get("silver.inventory.factOrder")
.expect("Failed to get graph index")
.clone()];
.expect("Failed to get graph index")];

let Node::Tabular(tabular) = tabular else {
panic!("Node is not a singer")
Expand Down Expand Up @@ -362,15 +353,15 @@ mod tests {
let mut index = repo.index().expect("Failed to create index");
index
.add_path(
&tap_path
tap_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
)
.expect("Failed to add path to index");
index
.add_path(
&target_path
target_path
.as_path()
.strip_prefix(temp_dir.path())
.expect("Failed to get relative path of file"),
Expand All @@ -392,11 +383,7 @@ mod tests {
.expect("Failed to get singer");
assert_eq!(orders, "bronze/inventory");

let singer = &dag.dag[dag
.map
.get(orders)
.expect("Failed to get graph index")
.clone()];
let singer = &dag.dag[*dag.map.get(orders).expect("Failed to get graph index")];

let Node::Singer(singer) = singer else {
panic!("Node is not a singer")
Expand Down
2 changes: 2 additions & 0 deletions dashtool/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub enum Error {
#[error(transparent)]
Parse(#[from] url::ParseError),
#[error(transparent)]
StrParse(#[from] std::str::ParseBoolError),
#[error(transparent)]
SQLParser(#[from] sqlparser::parser::ParserError),
#[error(transparent)]
DashbookCatalog(#[from] dashbook_catalog::error::Error),
Expand Down
2 changes: 1 addition & 1 deletion dashtool/src/plugins/dashbook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl DashbookPlugin {

let (access_token, id_token) = authorization(issuer, client_id, &refresh_token).await?;

let catalog_list = Arc::new(DashbookS3CatalogList::new(&access_token, &id_token));
let catalog_list = Arc::new(DashbookS3CatalogList::new(&access_token, &id_token).await?);

Ok(DashbookPlugin {
config,
Expand Down
4 changes: 4 additions & 0 deletions dashtool/src/plugins/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ impl SqlPlugin {
builder = builder.with_endpoint(endpoint);
}

if let Some(allow_http) = &s3_config.aws_allow_http {
builder = builder.with_allow_http(allow_http.parse()?);
}

Arc::new(builder.build()?)
}
};
Expand Down

0 comments on commit 84cc0ce

Please sign in to comment.