Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic Integration with Datafusion #324

Merged
merged 37 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
333b607
chore: basic structure
marvinlanhenke Apr 5, 2024
4bfc87a
feat: add IcebergCatalogProvider
marvinlanhenke Apr 5, 2024
e4ba25d
feat: add IcebergSchemaProvider
marvinlanhenke Apr 6, 2024
881cf37
feat: add IcebergTableProvider
marvinlanhenke Apr 6, 2024
47a041f
chore: add integration test infr
marvinlanhenke Apr 6, 2024
66a1667
fix: remove old test
marvinlanhenke Apr 6, 2024
709ab3e
chore: update crate structure
marvinlanhenke Apr 12, 2024
9510b7c
fix: remove workspace dep
marvinlanhenke Apr 22, 2024
2b92021
refactor: use try_join_all
marvinlanhenke Apr 22, 2024
5141d11
Merge branch 'main' into draft_datafusion
marvinlanhenke Apr 25, 2024
475a9a3
chore: remove feature flag
marvinlanhenke Apr 25, 2024
0f706ca
chore: rename package
marvinlanhenke Apr 25, 2024
e05fc45
chore: update readme
marvinlanhenke Apr 25, 2024
c868439
feat: add TableType
marvinlanhenke Apr 25, 2024
26b257e
fix: import + async_trait
marvinlanhenke Apr 25, 2024
60ff7f2
fix: imports + async_trait
marvinlanhenke Apr 25, 2024
7ef31fd
chore: remove feature flag
marvinlanhenke Apr 25, 2024
2f152fe
fix: cargo sort
marvinlanhenke Apr 25, 2024
8135bc7
refactor: CatalogProvider `fn try_new`
marvinlanhenke Apr 25, 2024
6cd85cc
refactor: SchemaProvider `fn try_new`
marvinlanhenke Apr 25, 2024
d646785
chore: update docs
marvinlanhenke Apr 25, 2024
b466936
chore: update docs
marvinlanhenke Apr 25, 2024
3c9bafc
chore: update doc
marvinlanhenke Apr 25, 2024
d24a0d3
feat: impl `fn schema` on TableProvider
marvinlanhenke Apr 25, 2024
948fc56
chore: rename ArrowSchema
marvinlanhenke Apr 25, 2024
5b9d9c7
refactor: remove DashMap
marvinlanhenke Apr 25, 2024
0d55fbc
feat: add basic IcebergTableScan
marvinlanhenke Apr 28, 2024
294e575
chore: fix docs
marvinlanhenke Apr 28, 2024
13cc2d8
chore: add comments
marvinlanhenke Apr 28, 2024
c95b1dd
fix: clippy
marvinlanhenke Apr 28, 2024
32f33cb
fix: typo
marvinlanhenke Apr 28, 2024
30830ec
fix: license
marvinlanhenke Apr 28, 2024
391f983
chore: update docs
marvinlanhenke Apr 29, 2024
d94d615
chore: move derive stmt
marvinlanhenke Apr 30, 2024
996f249
fix: collect into hashmap
marvinlanhenke Apr 30, 2024
199382d
chore: use DFResult
marvinlanhenke Apr 30, 2024
177e5c8
Update crates/integrations/datafusion/README.md
liurenjie1024 May 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/integrations/*",
"crates/test_utils",
]

Expand Down Expand Up @@ -56,6 +57,7 @@ fnv = "1"
futures = "0.3"
iceberg = { version = "0.2.0", path = "./crates/iceberg" }
iceberg-catalog-rest = { version = "0.2.0", path = "./crates/catalog/rest" }
iceberg-catalog-hms = { version = "0.2.0", path = "./crates/catalog/hms" }
itertools = "0.12"
lazy_static = "1"
log = "^0.4"
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use futures::AsyncReadExt;
use typed_builder::TypedBuilder;

/// Table represents a table in the catalog.
#[derive(TypedBuilder, Debug)]
#[derive(TypedBuilder, Debug, Clone)]
pub struct Table {
file_io: FileIO,
#[builder(default, setter(strip_option, into))]
Expand Down
43 changes: 43 additions & 0 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "iceberg-datafusion"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
rust-version = { workspace = true }

categories = ["database"]
description = "Apache Iceberg Datafusion Integration"
repository = { workspace = true }
license = { workspace = true }
keywords = ["iceberg", "integrations", "datafusion"]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
datafusion = { version = "37.0.0" }
futures = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
iceberg-catalog-hms = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
22 changes: 22 additions & 0 deletions crates/integrations/datafusion/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

# Apache Iceberg Datafusion Integration

This crate contains the integration of Apache Datafusion and Apache Iceberg.
liurenjie1024 marked this conversation as resolved.
Show resolved Hide resolved
97 changes: 97 additions & 0 deletions crates/integrations/datafusion/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, collections::HashMap, sync::Arc};

use datafusion::catalog::{schema::SchemaProvider, CatalogProvider};
use futures::future::try_join_all;
use iceberg::{Catalog, NamespaceIdent, Result};

use crate::schema::IcebergSchemaProvider;

/// Provides an interface to manage and access multiple schemas
/// within an Iceberg [`Catalog`].
///
/// Acts as a centralized catalog provider that aggregates
/// multiple [`SchemaProvider`], each associated with distinct namespaces.
pub struct IcebergCatalogProvider {
/// A `HashMap` where keys are namespace names
/// and values are dynamic references to objects implementing the
/// [`SchemaProvider`] trait.
schemas: HashMap<String, Arc<dyn SchemaProvider>>,
}

impl IcebergCatalogProvider {
/// Asynchronously tries to construct a new [`IcebergCatalogProvider`]
/// using the given client to fetch and initialize schema providers for
/// each namespace in the Iceberg [`Catalog`].
///
/// This method retrieves the list of namespace names
/// attempts to create a schema provider for each namespace, and
/// collects these providers into a `HashMap`.
pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> {
// TODO:
// Schemas and providers should be cached and evicted based on time
// As of right now; schemas might become stale.
Comment on lines +48 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 hard problems in computer science: cache invalidation, naming things, and off-by-1 errors. - Leon Bambrick

Is there no way to leave this up to the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 hard problems in computer science: cache invalidation, naming things, and off-by-1 errors. - Leon Bambrick

😄the classic.

I think leaving it up to the user, leads us to the issue about blocking an async call in a sync trait function? I think if we have an idea how to handle this, we can better reason about if, when, and where to cache?

from the docs:

To implement CatalogProvider and SchemaProvider for remote catalogs, you need to provide an in memory snapshot of the required metadata. Most systems typically either already have this information cached locally or can batch access to the remote catalog to retrieve multiple schemas and tables in a single network call.

let schema_names: Vec<_> = client
.list_namespaces(None)
.await?
.iter()
.flat_map(|ns| ns.as_ref().clone())
marvinlanhenke marked this conversation as resolved.
Show resolved Hide resolved
.collect();

let providers = try_join_all(
schema_names
.iter()
.map(|name| {
IcebergSchemaProvider::try_new(
client.clone(),
NamespaceIdent::new(name.clone()),
)
})
.collect::<Vec<_>>(),
)
.await?;

let schemas: Vec<_> = schema_names
.into_iter()
.zip(providers.into_iter())
.map(|(name, provider)| {
let provider = Arc::new(provider) as Arc<dyn SchemaProvider>;
(name, provider)
})
.collect();

Ok(IcebergCatalogProvider {
schemas: schemas.into_iter().collect(),
})
}
}

impl CatalogProvider for IcebergCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
self.schemas.keys().cloned().collect()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that this maybe incorrect since others processes may create new namespaces after creating the try_new method? We should the result of list_namespaces each time. For performance issue, we may create sth like CachingCatalog in java to implement Catalog trait, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you're right this is a simple but naive impl. based on the docs and the ref from delta-rs.

I think having a caching wrapper would be a better solution.

However, since we have to provide a vec of all schema_names we cannot check if a single schema is in the cache and if not fetch again. So I'm guessing we can only implement it with a time based eviction policy i.e. cache all schemas for 1min in order to avoid multiple network calls in a short amount of time (same concept as debouncing)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when adding a cache, we need to a design for the trade of between performance and consistency. But I think we could leave it to actualy cache implentation? For this pr, we should not cache them in the providers, e.g. CatalogProvider, SchemaProvider, etc, but calling Catalog api directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, we can do it like that for this PR and impl. the more sophisticated solution later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liurenjie1024
...I took another look; and I'm not sure we can call the Catalog API directly each time the funtion on the e.g. trait CatalogProvider is invoked. The trait is required to be synchronous; however our catalog api calls are async.

So perhaps; we leave it as is for this PR and work on the cache instead? However, I'll guess we're facing the same problem here as well? We could use rt.block_on(...) to solve this, but maybe, I'm just missing something and we can do this without needing to block?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to block on an async call, but it requires runtime api.

It does possible but we should avoid using rt.block_on() in our lib.

My suggestion is to leave it as it now, and fix it when we have runtime api landed.

I'm second with this suggestion. We can move on and figure how to improve this part in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any issue or discussion already related to that? Or would you mind to quickly outline whats the plan here?

There is a tracking issue to add runtime api: #124

But I agree with @Xuanwo that we should avoid rt.block_on as much as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be hesitant with caching, especially avoiding upfront optimizations. For Iceberg in general, consistency is king.

For this pr, we should not cache them in the providers, e.g. CatalogProvider, SchemaProvider, etc, but calling Catalog api directly.

This makes sense, but I see the issue with blocking on async calls. At first, I would take the price of waiting for the blocking calls. Even it is still a remote call, the ones to the REST catalog should be lightning-fast (that's where the caching happens).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, but I see the issue with blocking on async calls. At first, I would take the price of waiting for the blocking calls. Even it is still a remote call, the ones to the REST catalog should be lightning-fast (that's where the caching happens).

I think blocking + timeout would be a reasonable solution before we implement sophiscated caching. For this pr, we can leave it as now since it's even not caching, it's a snapshot. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this pr, we can leave it as now since it's even not caching, it's a snapshot. WDYT?

+1 for "moving on" and create or track those issues in #357 before this PR gets too big.

}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.schemas.get(name).cloned()
}
}
32 changes: 32 additions & 0 deletions crates/integrations/datafusion/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use anyhow::anyhow;
use iceberg::{Error, ErrorKind};

/// Converts a datafusion error into an iceberg error.
pub fn from_datafusion_error(error: datafusion::error::DataFusionError) -> Error {
Error::new(
ErrorKind::Unexpected,
"Operation failed for hitting datafusion error".to_string(),
)
.with_source(anyhow!("datafusion error: {:?}", error))
}
/// Converts an iceberg error into a datafusion error.
pub fn to_datafusion_error(error: Error) -> datafusion::error::DataFusionError {
datafusion::error::DataFusionError::External(error.into())
}
26 changes: 26 additions & 0 deletions crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod catalog;
pub use catalog::*;

mod error;
pub use error::*;

mod physical_plan;
mod schema;
mod table;
18 changes: 18 additions & 0 deletions crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub(crate) mod scan;