From 333b6073b8d1e731ebf5c754b913aa4ced85585a Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Fri, 5 Apr 2024 20:25:52 +0200 Subject: [PATCH 01/36] chore: basic structure --- Cargo.toml | 3 ++ crates/integrations/Cargo.toml | 38 +++++++++++++++++++ crates/integrations/README.md | 22 +++++++++++ crates/integrations/src/datafusion/catalog.rs | 16 ++++++++ crates/integrations/src/datafusion/mod.rs | 20 ++++++++++ crates/integrations/src/datafusion/schema.rs | 16 ++++++++ crates/integrations/src/datafusion/table.rs | 16 ++++++++ crates/integrations/src/lib.rs | 18 +++++++++ 8 files changed, 149 insertions(+) create mode 100644 crates/integrations/Cargo.toml create mode 100644 crates/integrations/README.md create mode 100644 crates/integrations/src/datafusion/catalog.rs create mode 100644 crates/integrations/src/datafusion/mod.rs create mode 100644 crates/integrations/src/datafusion/schema.rs create mode 100644 crates/integrations/src/datafusion/table.rs create mode 100644 crates/integrations/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 054fcc67..d3bcb1b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "crates/catalog/*", "crates/examples", "crates/iceberg", + "crates/integrations", "crates/test_utils", ] @@ -48,6 +49,7 @@ bimap = "0.6" bitvec = "1.0.1" bytes = "1.5" chrono = "0.4.34" +datafusion = "37.0.0" derive_builder = "0.20.0" either = "1" env_logger = "0.11.0" @@ -55,6 +57,7 @@ fnv = "1" futures = "0.3" iceberg = { version = "0.2.0", path = "./crates/iceberg" } iceberg-catalog-rest = { version = "0.2.0", path = "./crates/catalog/rest" } +iceberg-catalog-hms = { version = "0.2.0", path = "./crates/catalog/hms" } itertools = "0.12" lazy_static = "1" log = "^0.4" diff --git a/crates/integrations/Cargo.toml b/crates/integrations/Cargo.toml new file mode 100644 index 00000000..1fc02f3e --- /dev/null +++ b/crates/integrations/Cargo.toml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-integrations" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Integrations" +repository = { workspace = true } +license = { workspace = true } +keywords = ["iceberg", "integrations"] + +[dependencies] +datafusion = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } + +[dev-dependencies] +iceberg_test_utils = { path = "../test_utils", features = ["tests"] } +port_scanner = { workspace = true } diff --git a/crates/integrations/README.md b/crates/integrations/README.md new file mode 100644 index 00000000..d9381959 --- /dev/null +++ b/crates/integrations/README.md @@ -0,0 +1,22 @@ + + +# Apache Iceberg Integrations + +This crate contains the official Native Rust implementation of Apache Iceberg Integrations. diff --git a/crates/integrations/src/datafusion/catalog.rs b/crates/integrations/src/datafusion/catalog.rs new file mode 100644 index 00000000..b248758b --- /dev/null +++ b/crates/integrations/src/datafusion/catalog.rs @@ -0,0 +1,16 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. diff --git a/crates/integrations/src/datafusion/mod.rs b/crates/integrations/src/datafusion/mod.rs new file mode 100644 index 00000000..6d4e49db --- /dev/null +++ b/crates/integrations/src/datafusion/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod catalog; +mod schema; +mod table; diff --git a/crates/integrations/src/datafusion/schema.rs b/crates/integrations/src/datafusion/schema.rs new file mode 100644 index 00000000..b248758b --- /dev/null +++ b/crates/integrations/src/datafusion/schema.rs @@ -0,0 +1,16 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. diff --git a/crates/integrations/src/datafusion/table.rs b/crates/integrations/src/datafusion/table.rs new file mode 100644 index 00000000..b248758b --- /dev/null +++ b/crates/integrations/src/datafusion/table.rs @@ -0,0 +1,16 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. diff --git a/crates/integrations/src/lib.rs b/crates/integrations/src/lib.rs new file mode 100644 index 00000000..9dbeae15 --- /dev/null +++ b/crates/integrations/src/lib.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod datafusion; From 4bfc87ab26f9e19b6b39139c0f2dbc973924fd60 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Fri, 5 Apr 2024 21:03:42 +0200 Subject: [PATCH 02/36] feat: add IcebergCatalogProvider --- Cargo.toml | 1 + crates/integrations/Cargo.toml | 3 + crates/integrations/src/datafusion/catalog.rs | 103 ++++++++++++++++++ 3 files changed, 107 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index d3bcb1b1..6d36ccbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ bimap = "0.6" bitvec = "1.0.1" bytes = "1.5" chrono = "0.4.34" +dashmap = "5.5.3" datafusion = "37.0.0" derive_builder = "0.20.0" either = "1" diff --git a/crates/integrations/Cargo.toml b/crates/integrations/Cargo.toml index 1fc02f3e..e2d5667b 100644 --- a/crates/integrations/Cargo.toml +++ b/crates/integrations/Cargo.toml @@ -29,10 +29,13 @@ license = { workspace = true } keywords = ["iceberg", "integrations"] [dependencies] +dashmap = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } +iceberg = { workspace = true } tokio = { workspace = true } [dev-dependencies] +iceberg-catalog-hms = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } port_scanner = { workspace = true } diff --git a/crates/integrations/src/datafusion/catalog.rs b/crates/integrations/src/datafusion/catalog.rs index b248758b..88622bfc 100644 --- a/crates/integrations/src/datafusion/catalog.rs +++ b/crates/integrations/src/datafusion/catalog.rs @@ -14,3 +14,106 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +use std::{any::Any, sync::Arc}; + +use dashmap::DashMap; +use datafusion::catalog::{schema::SchemaProvider, CatalogProvider}; +use iceberg::{Catalog, NamespaceIdent, Result}; + +use crate::datafusion::schema::IcebergSchemaProvider; + +struct IcebergCatalogProvider { + schemas: DashMap>, +} + +impl IcebergCatalogProvider { + async fn try_new(client: Arc) -> Result { + let schema_names: Vec = client + .list_namespaces(None) + .await? + .iter() + .map(|ns| ns.as_ref().clone()) + .flatten() + .collect(); + + let mut schemas = Vec::new(); + for name in schema_names { + let provider = + IcebergSchemaProvider::try_new(client.clone(), &NamespaceIdent::new(name.clone())) + .await?; + let provider = Arc::new(provider) as Arc; + + schemas.push((name, provider)) + } + + Ok(IcebergCatalogProvider { + schemas: schemas.into_iter().collect(), + }) + } +} + +impl CatalogProvider for IcebergCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.schemas.iter().map(|c| c.key().clone()).collect() + } + + fn schema(&self, name: &str) -> Option> { + self.schemas.get(name).map(|c| c.value().clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::{collections::HashMap, sync::Arc}; + + use datafusion::execution::context::SessionContext; + use iceberg::{ + io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}, + Result, + }; + use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport}; + + fn create_hive_client() -> Result { + let props = HashMap::from([ + ( + S3_ENDPOINT.to_string(), + format!("http://{}:{}", "0.0.0.0", "9000"), + ), + (S3_ACCESS_KEY_ID.to_string(), "minioadmin".to_string()), + (S3_SECRET_ACCESS_KEY.to_string(), "minioadmin".to_string()), + (S3_REGION.to_string(), "us-east-1".to_string()), + ]); + + let config = HmsCatalogConfig::builder() + .address("0.0.0.0:9083".to_string()) + .warehouse("s3a://transformed/dwh".to_string()) + .thrift_transport(HmsThriftTransport::Buffered) + .props(props) + .build(); + + HmsCatalog::new(config) + } + + #[tokio::test] + async fn test_schema_provider() -> Result<()> { + let client = Arc::new(create_hive_client()?); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("hive", catalog); + + let provider = ctx.catalog("hive").unwrap(); + let result = provider.schema_names(); + + println!("{:?}", result); + + Ok(()) + } +} From e4ba25d40f4aec96408f1e0ddf3d6ccd8eb8f317 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 6 Apr 2024 05:56:12 +0200 Subject: [PATCH 03/36] feat: add IcebergSchemaProvider --- crates/integrations/src/datafusion/schema.rs | 84 ++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/crates/integrations/src/datafusion/schema.rs b/crates/integrations/src/datafusion/schema.rs index b248758b..e0e2654f 100644 --- a/crates/integrations/src/datafusion/schema.rs +++ b/crates/integrations/src/datafusion/schema.rs @@ -14,3 +14,87 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +use std::{any::Any, sync::Arc}; + +use dashmap::DashMap; +use datafusion::{ + catalog::schema::SchemaProvider, datasource::TableProvider, error::DataFusionError, +}; +use futures::FutureExt; +use iceberg::{Catalog, NamespaceIdent, Result}; + +use crate::datafusion::table::IcebergTableProvider; + +pub(crate) struct IcebergSchemaProvider { + tables: DashMap>, +} + +impl IcebergSchemaProvider { + pub(crate) async fn try_new( + client: Arc, + namespace: &NamespaceIdent, + ) -> Result { + let table_names: Vec = client + .list_tables(namespace) + .await? + .iter() + .map(|t| t.name().to_owned()) + .collect(); + + let mut tables = Vec::new(); + for name in table_names { + let provider = IcebergTableProvider::try_new(client.clone(), namespace, &name).await?; + let provider = Arc::new(provider) as Arc; + + tables.push((name, provider)); + } + + Ok(IcebergSchemaProvider { + tables: tables.into_iter().collect(), + }) + } +} + +impl SchemaProvider for IcebergSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.tables.iter().map(|c| c.key().clone()).collect() + } + + fn table_exist(&self, name: &str) -> bool { + match self.tables.get(name) { + None => false, + Some(_) => true, + } + } + + fn table<'life0, 'life1, 'async_trait>( + &'life0 self, + name: &'life1 str, + ) -> core::pin::Pin< + Box< + dyn core::future::Future< + Output = datafusion::error::Result< + Option>, + DataFusionError, + >, + > + core::marker::Send + + 'async_trait, + >, + > + where + 'life0: 'async_trait, + 'life1: 'async_trait, + Self: 'async_trait, + { + async move { + let table = self.tables.get(name).map(|c| c.value().clone()); + Ok(table) + } + .boxed() + } +} From 881cf3757c59ae40233f5b245a99f0063d6db77b Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 6 Apr 2024 05:56:23 +0200 Subject: [PATCH 04/36] feat: add IcebergTableProvider --- crates/integrations/src/datafusion/table.rs | 63 +++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/crates/integrations/src/datafusion/table.rs b/crates/integrations/src/datafusion/table.rs index b248758b..38956909 100644 --- a/crates/integrations/src/datafusion/table.rs +++ b/crates/integrations/src/datafusion/table.rs @@ -14,3 +14,66 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +use std::sync::Arc; + +use datafusion::datasource::TableProvider; +use iceberg::{table::Table, Catalog, NamespaceIdent, Result, TableIdent}; + +pub(crate) struct IcebergTableProvider { + _inner: Table, +} + +impl IcebergTableProvider { + pub(crate) async fn try_new( + client: Arc, + namespace: &NamespaceIdent, + name: impl Into, + ) -> Result { + let name = name.into(); + let ident = TableIdent::new(namespace.to_owned(), name); + let table = client.load_table(&ident).await?; + + Ok(IcebergTableProvider { _inner: table }) + } +} + +impl TableProvider for IcebergTableProvider { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> datafusion::arrow::datatypes::SchemaRef { + todo!() + } + + fn table_type(&self) -> datafusion::datasource::TableType { + todo!() + } + + fn scan<'life0, 'life1, 'life2, 'life3, 'async_trait>( + &'life0 self, + _state: &'life1 datafusion::execution::context::SessionState, + _projection: Option<&'life2 Vec>, + _filters: &'life3 [datafusion::prelude::Expr], + _limit: Option, + ) -> core::pin::Pin< + Box< + dyn core::future::Future< + Output = datafusion::error::Result< + Arc, + >, + > + core::marker::Send + + 'async_trait, + >, + > + where + 'life0: 'async_trait, + 'life1: 'async_trait, + 'life2: 'async_trait, + 'life3: 'async_trait, + Self: 'async_trait, + { + todo!() + } +} From 47a041f9712f7ab553094960ba282d0d35d9a5c7 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 6 Apr 2024 06:19:57 +0200 Subject: [PATCH 05/36] chore: add integration test infr --- crates/integrations/Cargo.toml | 1 + crates/integrations/src/datafusion/catalog.rs | 7 +- crates/integrations/src/datafusion/mod.rs | 2 + crates/integrations/src/datafusion/schema.rs | 5 +- crates/integrations/src/lib.rs | 1 + .../integrations/testdata/docker-compose.yaml | 50 ++++++ .../testdata/hms_catalog/Dockerfile | 34 ++++ .../testdata/hms_catalog/core-site.xml | 51 ++++++ .../tests/integration_datafusion_hms_test.rs | 157 ++++++++++++++++++ 9 files changed, 300 insertions(+), 8 deletions(-) create mode 100644 crates/integrations/testdata/docker-compose.yaml create mode 100644 crates/integrations/testdata/hms_catalog/Dockerfile create mode 100644 crates/integrations/testdata/hms_catalog/core-site.xml create mode 100644 crates/integrations/tests/integration_datafusion_hms_test.rs diff --git a/crates/integrations/Cargo.toml b/crates/integrations/Cargo.toml index e2d5667b..cbe6123f 100644 --- a/crates/integrations/Cargo.toml +++ b/crates/integrations/Cargo.toml @@ -33,6 +33,7 @@ dashmap = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } +log = { workspace = true } tokio = { workspace = true } [dev-dependencies] diff --git a/crates/integrations/src/datafusion/catalog.rs b/crates/integrations/src/datafusion/catalog.rs index 88622bfc..f9be6f0f 100644 --- a/crates/integrations/src/datafusion/catalog.rs +++ b/crates/integrations/src/datafusion/catalog.rs @@ -23,18 +23,17 @@ use iceberg::{Catalog, NamespaceIdent, Result}; use crate::datafusion::schema::IcebergSchemaProvider; -struct IcebergCatalogProvider { +pub struct IcebergCatalogProvider { schemas: DashMap>, } impl IcebergCatalogProvider { - async fn try_new(client: Arc) -> Result { + pub async fn try_new(client: Arc) -> Result { let schema_names: Vec = client .list_namespaces(None) .await? .iter() - .map(|ns| ns.as_ref().clone()) - .flatten() + .flat_map(|ns| ns.as_ref().clone()) .collect(); let mut schemas = Vec::new(); diff --git a/crates/integrations/src/datafusion/mod.rs b/crates/integrations/src/datafusion/mod.rs index 6d4e49db..24cfb6bf 100644 --- a/crates/integrations/src/datafusion/mod.rs +++ b/crates/integrations/src/datafusion/mod.rs @@ -16,5 +16,7 @@ // under the License. mod catalog; +pub use catalog::*; + mod schema; mod table; diff --git a/crates/integrations/src/datafusion/schema.rs b/crates/integrations/src/datafusion/schema.rs index e0e2654f..d9089f91 100644 --- a/crates/integrations/src/datafusion/schema.rs +++ b/crates/integrations/src/datafusion/schema.rs @@ -66,10 +66,7 @@ impl SchemaProvider for IcebergSchemaProvider { } fn table_exist(&self, name: &str) -> bool { - match self.tables.get(name) { - None => false, - Some(_) => true, - } + self.tables.get(name).is_some() } fn table<'life0, 'life1, 'async_trait>( diff --git a/crates/integrations/src/lib.rs b/crates/integrations/src/lib.rs index 9dbeae15..0451c7e7 100644 --- a/crates/integrations/src/lib.rs +++ b/crates/integrations/src/lib.rs @@ -16,3 +16,4 @@ // under the License. mod datafusion; +pub use datafusion::*; diff --git a/crates/integrations/testdata/docker-compose.yaml b/crates/integrations/testdata/docker-compose.yaml new file mode 100644 index 00000000..282dc66c --- /dev/null +++ b/crates/integrations/testdata/docker-compose.yaml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3.8' + +services: + minio: + image: minio/minio:RELEASE.2024-03-07T00-43-48Z + expose: + - 9000 + - 9001 + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + command: [ "server", "/data", "--console-address", ":9001" ] + + mc: + depends_on: + - minio + image: minio/mc:RELEASE.2024-03-07T00-31-49Z + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f /dev/null " + + hive-metastore: + image: iceberg-hive-metastore + build: ./hms_catalog/ + expose: + - 9083 + environment: + SERVICE_NAME: "metastore" + SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/" diff --git a/crates/integrations/testdata/hms_catalog/Dockerfile b/crates/integrations/testdata/hms_catalog/Dockerfile new file mode 100644 index 00000000..ff8c9fae --- /dev/null +++ b/crates/integrations/testdata/hms_catalog/Dockerfile @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM openjdk:8-jre-slim AS build + +RUN apt-get update -qq && apt-get -qq -y install curl + +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 + +RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar -Lo /tmp/aws-java-sdk-bundle-1.11.271.jar +RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar + + +FROM apache/hive:3.1.3 + +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 + +COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar /opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar +COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar /opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar +COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml \ No newline at end of file diff --git a/crates/integrations/testdata/hms_catalog/core-site.xml b/crates/integrations/testdata/hms_catalog/core-site.xml new file mode 100644 index 00000000..f0583a0b --- /dev/null +++ b/crates/integrations/testdata/hms_catalog/core-site.xml @@ -0,0 +1,51 @@ + + + + + fs.defaultFS + s3a://warehouse/hive + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3a.fast.upload + true + + + fs.s3a.endpoint + http://minio:9000 + + + fs.s3a.access.key + admin + + + fs.s3a.secret.key + password + + + fs.s3a.connection.ssl.enabled + false + + + fs.s3a.path.style.access + true + + \ No newline at end of file diff --git a/crates/integrations/tests/integration_datafusion_hms_test.rs b/crates/integrations/tests/integration_datafusion_hms_test.rs new file mode 100644 index 00000000..faef3fb1 --- /dev/null +++ b/crates/integrations/tests/integration_datafusion_hms_test.rs @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for Datafusion with Hive Metastore. + +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::execution::context::SessionContext; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; +use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport}; +use iceberg_integrations::IcebergCatalogProvider; +use iceberg_test_utils::docker::DockerCompose; +use iceberg_test_utils::{normalize_test_name, set_up}; +use port_scanner::scan_port_addr; +use tokio::time::sleep; + +const HMS_CATALOG_PORT: u16 = 9083; +const MINIO_PORT: u16 = 9000; + +struct TestFixture { + _docker_compose: DockerCompose, + hms_catalog: HmsCatalog, +} + +async fn set_test_fixture(func: &str) -> TestFixture { + set_up(); + + let docker_compose = DockerCompose::new( + normalize_test_name(format!("{}_{func}", module_path!())), + format!("{}/testdata", env!("CARGO_MANIFEST_DIR")), + ); + + docker_compose.run(); + + let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore"); + let minio_ip = docker_compose.get_container_ip("minio"); + + let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT); + loop { + if !scan_port_addr(&read_port) { + log::info!("Waiting for 1s hms catalog to ready..."); + sleep(std::time::Duration::from_millis(1000)).await; + } else { + break; + } + } + + let props = HashMap::from([ + ( + S3_ENDPOINT.to_string(), + format!("http://{}:{}", minio_ip, MINIO_PORT), + ), + (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()), + (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()), + (S3_REGION.to_string(), "us-east-1".to_string()), + ]); + + let config = HmsCatalogConfig::builder() + .address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT)) + .thrift_transport(HmsThriftTransport::Buffered) + .warehouse("s3a://warehouse/hive".to_string()) + .props(props) + .build(); + + let hms_catalog = HmsCatalog::new(config).unwrap(); + + TestFixture { + _docker_compose: docker_compose, + hms_catalog, + } +} + +fn set_table_creation(location: impl ToString, name: impl ToString) -> Result { + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + let creation = TableCreation::builder() + .location(location.to_string()) + .name(name.to_string()) + .properties(HashMap::new()) + .schema(schema) + .build(); + + Ok(creation) +} + +#[tokio::test] +async fn test_provider_list_table_names() -> Result<()> { + let fixture = set_test_fixture("test_provider_list_table_names").await; + + let namespace = NamespaceIdent::new("default".to_string()); + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + + fixture + .hms_catalog + .create_table(&namespace, creation) + .await?; + + let client = Arc::new(fixture.hms_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("hive", catalog); + + let provider = ctx.catalog("hive").unwrap(); + let schema = provider.schema("default").unwrap(); + + let expected = vec!["my_table"]; + let result = schema.table_names(); + + assert_eq!(result, expected); + + Ok(()) +} + +#[tokio::test] +async fn test_provider_list_schema_names() -> Result<()> { + let fixture = set_test_fixture("test_provider_list_schema_names").await; + set_table_creation("default", "my_table")?; + + let client = Arc::new(fixture.hms_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("hive", catalog); + + let provider = ctx.catalog("hive").unwrap(); + + let expected = vec!["default"]; + let result = provider.schema_names(); + + assert_eq!(result, expected); + + Ok(()) +} From 66a16673d90ef7b0ae4ffbc9653c5cc6d3dc5830 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 6 Apr 2024 06:30:59 +0200 Subject: [PATCH 06/36] fix: remove old test --- crates/integrations/src/datafusion/catalog.rs | 51 ------------------- 1 file changed, 51 deletions(-) diff --git a/crates/integrations/src/datafusion/catalog.rs b/crates/integrations/src/datafusion/catalog.rs index f9be6f0f..8a389de3 100644 --- a/crates/integrations/src/datafusion/catalog.rs +++ b/crates/integrations/src/datafusion/catalog.rs @@ -65,54 +65,3 @@ impl CatalogProvider for IcebergCatalogProvider { self.schemas.get(name).map(|c| c.value().clone()) } } - -#[cfg(test)] -mod tests { - use super::*; - - use std::{collections::HashMap, sync::Arc}; - - use datafusion::execution::context::SessionContext; - use iceberg::{ - io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}, - Result, - }; - use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport}; - - fn create_hive_client() -> Result { - let props = HashMap::from([ - ( - S3_ENDPOINT.to_string(), - format!("http://{}:{}", "0.0.0.0", "9000"), - ), - (S3_ACCESS_KEY_ID.to_string(), "minioadmin".to_string()), - (S3_SECRET_ACCESS_KEY.to_string(), "minioadmin".to_string()), - (S3_REGION.to_string(), "us-east-1".to_string()), - ]); - - let config = HmsCatalogConfig::builder() - .address("0.0.0.0:9083".to_string()) - .warehouse("s3a://transformed/dwh".to_string()) - .thrift_transport(HmsThriftTransport::Buffered) - .props(props) - .build(); - - HmsCatalog::new(config) - } - - #[tokio::test] - async fn test_schema_provider() -> Result<()> { - let client = Arc::new(create_hive_client()?); - let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); - - let ctx = SessionContext::new(); - ctx.register_catalog("hive", catalog); - - let provider = ctx.catalog("hive").unwrap(); - let result = provider.schema_names(); - - println!("{:?}", result); - - Ok(()) - } -} From 709ab3e2778234fcb9488f55ad03bd76dd089c03 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Fri, 12 Apr 2024 13:37:31 +0200 Subject: [PATCH 07/36] chore: update crate structure --- Cargo.toml | 2 +- .../integrations/{ => datafusion}/Cargo.toml | 14 +++++++----- .../integrations/{ => datafusion}/README.md | 0 .../datafusion => datafusion/src}/catalog.rs | 2 +- .../integrations/{ => datafusion}/src/lib.rs | 9 ++++++-- .../datafusion => datafusion/src}/schema.rs | 2 +- .../datafusion => datafusion/src}/table.rs | 0 .../testdata/docker-compose.yaml | 0 .../testdata/hms_catalog/Dockerfile | 0 .../testdata/hms_catalog/core-site.xml | 0 .../tests/integration_datafusion_hms_test.rs | 4 +++- crates/integrations/src/datafusion/mod.rs | 22 ------------------- 12 files changed, 22 insertions(+), 33 deletions(-) rename crates/integrations/{ => datafusion}/Cargo.toml (79%) rename crates/integrations/{ => datafusion}/README.md (100%) rename crates/integrations/{src/datafusion => datafusion/src}/catalog.rs (97%) rename crates/integrations/{ => datafusion}/src/lib.rs (89%) rename crates/integrations/{src/datafusion => datafusion/src}/schema.rs (98%) rename crates/integrations/{src/datafusion => datafusion/src}/table.rs (100%) rename crates/integrations/{ => datafusion}/testdata/docker-compose.yaml (100%) rename crates/integrations/{ => datafusion}/testdata/hms_catalog/Dockerfile (100%) rename crates/integrations/{ => datafusion}/testdata/hms_catalog/core-site.xml (100%) rename crates/integrations/{ => datafusion}/tests/integration_datafusion_hms_test.rs (98%) delete mode 100644 crates/integrations/src/datafusion/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 6d36ccbf..2fb254de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ members = [ "crates/catalog/*", "crates/examples", "crates/iceberg", - "crates/integrations", + "crates/integrations/*", "crates/test_utils", ] diff --git a/crates/integrations/Cargo.toml b/crates/integrations/datafusion/Cargo.toml similarity index 79% rename from crates/integrations/Cargo.toml rename to crates/integrations/datafusion/Cargo.toml index cbe6123f..909e67dd 100644 --- a/crates/integrations/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -16,21 +16,25 @@ # under the License. [package] -name = "iceberg-integrations" +name = "iceberg-integrations-datafusion" version = { workspace = true } edition = { workspace = true } homepage = { workspace = true } rust-version = { workspace = true } categories = ["database"] -description = "Apache Iceberg Integrations" +description = "Apache Iceberg Integrations Datafusion" repository = { workspace = true } license = { workspace = true } -keywords = ["iceberg", "integrations"] +keywords = ["iceberg", "integrations", "datfusion"] + +[features] +default = [] +datafusion = ["dep:datafusion"] [dependencies] dashmap = { workspace = true } -datafusion = { workspace = true } +datafusion = { workspace = true, optional = true } futures = { workspace = true } iceberg = { workspace = true } log = { workspace = true } @@ -38,5 +42,5 @@ tokio = { workspace = true } [dev-dependencies] iceberg-catalog-hms = { workspace = true } -iceberg_test_utils = { path = "../test_utils", features = ["tests"] } +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } port_scanner = { workspace = true } diff --git a/crates/integrations/README.md b/crates/integrations/datafusion/README.md similarity index 100% rename from crates/integrations/README.md rename to crates/integrations/datafusion/README.md diff --git a/crates/integrations/src/datafusion/catalog.rs b/crates/integrations/datafusion/src/catalog.rs similarity index 97% rename from crates/integrations/src/datafusion/catalog.rs rename to crates/integrations/datafusion/src/catalog.rs index 8a389de3..9a3a6142 100644 --- a/crates/integrations/src/datafusion/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -21,7 +21,7 @@ use dashmap::DashMap; use datafusion::catalog::{schema::SchemaProvider, CatalogProvider}; use iceberg::{Catalog, NamespaceIdent, Result}; -use crate::datafusion::schema::IcebergSchemaProvider; +use crate::schema::IcebergSchemaProvider; pub struct IcebergCatalogProvider { schemas: DashMap>, diff --git a/crates/integrations/src/lib.rs b/crates/integrations/datafusion/src/lib.rs similarity index 89% rename from crates/integrations/src/lib.rs rename to crates/integrations/datafusion/src/lib.rs index 0451c7e7..88592c9a 100644 --- a/crates/integrations/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -15,5 +15,10 @@ // specific language governing permissions and limitations // under the License. -mod datafusion; -pub use datafusion::*; +#![cfg(feature = "datafusion")] + +mod catalog; +pub use catalog::*; + +mod schema; +mod table; diff --git a/crates/integrations/src/datafusion/schema.rs b/crates/integrations/datafusion/src/schema.rs similarity index 98% rename from crates/integrations/src/datafusion/schema.rs rename to crates/integrations/datafusion/src/schema.rs index d9089f91..41b1b2db 100644 --- a/crates/integrations/src/datafusion/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -24,7 +24,7 @@ use datafusion::{ use futures::FutureExt; use iceberg::{Catalog, NamespaceIdent, Result}; -use crate::datafusion::table::IcebergTableProvider; +use crate::table::IcebergTableProvider; pub(crate) struct IcebergSchemaProvider { tables: DashMap>, diff --git a/crates/integrations/src/datafusion/table.rs b/crates/integrations/datafusion/src/table.rs similarity index 100% rename from crates/integrations/src/datafusion/table.rs rename to crates/integrations/datafusion/src/table.rs diff --git a/crates/integrations/testdata/docker-compose.yaml b/crates/integrations/datafusion/testdata/docker-compose.yaml similarity index 100% rename from crates/integrations/testdata/docker-compose.yaml rename to crates/integrations/datafusion/testdata/docker-compose.yaml diff --git a/crates/integrations/testdata/hms_catalog/Dockerfile b/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile similarity index 100% rename from crates/integrations/testdata/hms_catalog/Dockerfile rename to crates/integrations/datafusion/testdata/hms_catalog/Dockerfile diff --git a/crates/integrations/testdata/hms_catalog/core-site.xml b/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml similarity index 100% rename from crates/integrations/testdata/hms_catalog/core-site.xml rename to crates/integrations/datafusion/testdata/hms_catalog/core-site.xml diff --git a/crates/integrations/tests/integration_datafusion_hms_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs similarity index 98% rename from crates/integrations/tests/integration_datafusion_hms_test.rs rename to crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs index faef3fb1..6c983088 100644 --- a/crates/integrations/tests/integration_datafusion_hms_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs @@ -17,6 +17,8 @@ //! Integration tests for Datafusion with Hive Metastore. +#![cfg(feature = "datafusion")] + use std::collections::HashMap; use std::sync::Arc; @@ -25,7 +27,7 @@ use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport}; -use iceberg_integrations::IcebergCatalogProvider; +use iceberg_integrations_datafusion::IcebergCatalogProvider; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; use port_scanner::scan_port_addr; diff --git a/crates/integrations/src/datafusion/mod.rs b/crates/integrations/src/datafusion/mod.rs deleted file mode 100644 index 24cfb6bf..00000000 --- a/crates/integrations/src/datafusion/mod.rs +++ /dev/null @@ -1,22 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -mod catalog; -pub use catalog::*; - -mod schema; -mod table; From 9510b7c620e76278ce56f74bea7bdcfc9dbab7dd Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 22 Apr 2024 14:04:57 +0200 Subject: [PATCH 08/36] fix: remove workspace dep --- Cargo.toml | 1 - crates/integrations/datafusion/Cargo.toml | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2fb254de..47efbe68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,6 @@ bitvec = "1.0.1" bytes = "1.5" chrono = "0.4.34" dashmap = "5.5.3" -datafusion = "37.0.0" derive_builder = "0.20.0" either = "1" env_logger = "0.11.0" diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 909e67dd..23129cfb 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -34,7 +34,7 @@ datafusion = ["dep:datafusion"] [dependencies] dashmap = { workspace = true } -datafusion = { workspace = true, optional = true } +datafusion = { version = "37.0.0", optional = true } futures = { workspace = true } iceberg = { workspace = true } log = { workspace = true } From 2b92021ed08155f3114e14eb15c5461b18802f16 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 22 Apr 2024 14:57:36 +0200 Subject: [PATCH 09/36] refactor: use try_join_all --- crates/integrations/datafusion/src/catalog.rs | 16 +++++++++++----- crates/integrations/datafusion/src/schema.rs | 17 +++++++++++------ crates/integrations/datafusion/src/table.rs | 4 ++-- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 9a3a6142..96922ac1 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -19,6 +19,7 @@ use std::{any::Any, sync::Arc}; use dashmap::DashMap; use datafusion::catalog::{schema::SchemaProvider, CatalogProvider}; +use futures::future::try_join_all; use iceberg::{Catalog, NamespaceIdent, Result}; use crate::schema::IcebergSchemaProvider; @@ -36,13 +37,18 @@ impl IcebergCatalogProvider { .flat_map(|ns| ns.as_ref().clone()) .collect(); + let futures: Vec<_> = schema_names + .iter() + .map(|name| { + IcebergSchemaProvider::try_new(client.clone(), NamespaceIdent::new(name.clone())) + }) + .collect(); + + let providers = try_join_all(futures).await?; + let mut schemas = Vec::new(); - for name in schema_names { - let provider = - IcebergSchemaProvider::try_new(client.clone(), &NamespaceIdent::new(name.clone())) - .await?; + for (name, provider) in schema_names.into_iter().zip(providers.into_iter()) { let provider = Arc::new(provider) as Arc; - schemas.push((name, provider)) } diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 41b1b2db..eacebba9 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -21,7 +21,7 @@ use dashmap::DashMap; use datafusion::{ catalog::schema::SchemaProvider, datasource::TableProvider, error::DataFusionError, }; -use futures::FutureExt; +use futures::{future::try_join_all, FutureExt}; use iceberg::{Catalog, NamespaceIdent, Result}; use crate::table::IcebergTableProvider; @@ -33,20 +33,25 @@ pub(crate) struct IcebergSchemaProvider { impl IcebergSchemaProvider { pub(crate) async fn try_new( client: Arc, - namespace: &NamespaceIdent, + namespace: NamespaceIdent, ) -> Result { let table_names: Vec = client - .list_tables(namespace) + .list_tables(&namespace) .await? .iter() .map(|t| t.name().to_owned()) .collect(); + let futures: Vec<_> = table_names + .iter() + .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) + .collect(); + + let providers = try_join_all(futures).await?; + let mut tables = Vec::new(); - for name in table_names { - let provider = IcebergTableProvider::try_new(client.clone(), namespace, &name).await?; + for (name, provider) in table_names.into_iter().zip(providers.into_iter()) { let provider = Arc::new(provider) as Arc; - tables.push((name, provider)); } diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs index 38956909..d91fbaef 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table.rs @@ -27,11 +27,11 @@ pub(crate) struct IcebergTableProvider { impl IcebergTableProvider { pub(crate) async fn try_new( client: Arc, - namespace: &NamespaceIdent, + namespace: NamespaceIdent, name: impl Into, ) -> Result { let name = name.into(); - let ident = TableIdent::new(namespace.to_owned(), name); + let ident = TableIdent::new(namespace, name); let table = client.load_table(&ident).await?; Ok(IcebergTableProvider { _inner: table }) From 475a9a30931bc9062d1f516a17b31c3950ed5fd7 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 17:40:05 +0200 Subject: [PATCH 10/36] chore: remove feature flag --- crates/integrations/datafusion/Cargo.toml | 5 +---- crates/integrations/datafusion/src/lib.rs | 2 -- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 23129cfb..34963c25 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -28,13 +28,10 @@ repository = { workspace = true } license = { workspace = true } keywords = ["iceberg", "integrations", "datfusion"] -[features] -default = [] -datafusion = ["dep:datafusion"] [dependencies] dashmap = { workspace = true } -datafusion = { version = "37.0.0", optional = true } +datafusion = { version = "37.0.0" } futures = { workspace = true } iceberg = { workspace = true } log = { workspace = true } diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 88592c9a..24cfb6bf 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -#![cfg(feature = "datafusion")] - mod catalog; pub use catalog::*; From 0f706cac9ea5d0d086b72337b07dbaa84e3d8560 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 17:41:33 +0200 Subject: [PATCH 11/36] chore: rename package --- crates/integrations/datafusion/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 34963c25..b8356801 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -16,17 +16,17 @@ # under the License. [package] -name = "iceberg-integrations-datafusion" +name = "iceberg-datafusion" version = { workspace = true } edition = { workspace = true } homepage = { workspace = true } rust-version = { workspace = true } categories = ["database"] -description = "Apache Iceberg Integrations Datafusion" +description = "Apache Iceberg Datafusion Integration" repository = { workspace = true } license = { workspace = true } -keywords = ["iceberg", "integrations", "datfusion"] +keywords = ["iceberg", "integrations", "datafusion"] [dependencies] From e05fc4554a6d38160e504db91555f333745ed8fa Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 17:43:17 +0200 Subject: [PATCH 12/36] chore: update readme --- crates/integrations/datafusion/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/README.md b/crates/integrations/datafusion/README.md index d9381959..23245550 100644 --- a/crates/integrations/datafusion/README.md +++ b/crates/integrations/datafusion/README.md @@ -17,6 +17,6 @@ ~ under the License. --> -# Apache Iceberg Integrations +# Apache Iceberg Datafusion Integration -This crate contains the official Native Rust implementation of Apache Iceberg Integrations. +This crate contains the integration of Apache Datafusion and Apache Iceberg. From c86843979038b2bcaf1bd0bbb80d203ffb315649 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 17:44:24 +0200 Subject: [PATCH 13/36] feat: add TableType --- crates/integrations/datafusion/src/table.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs index d91fbaef..78c138d1 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use datafusion::datasource::TableProvider; +use datafusion::datasource::{TableProvider, TableType}; use iceberg::{table::Table, Catalog, NamespaceIdent, Result, TableIdent}; pub(crate) struct IcebergTableProvider { @@ -48,7 +48,7 @@ impl TableProvider for IcebergTableProvider { } fn table_type(&self) -> datafusion::datasource::TableType { - todo!() + TableType::Base } fn scan<'life0, 'life1, 'life2, 'life3, 'async_trait>( From 26b257eccd872be652ac07c8c0a8afc7e7c0b8c3 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 17:50:58 +0200 Subject: [PATCH 14/36] fix: import + async_trait --- crates/integrations/datafusion/Cargo.toml | 1 + crates/integrations/datafusion/src/table.rs | 46 +++++++++------------ 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index b8356801..61f1913a 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -30,6 +30,7 @@ keywords = ["iceberg", "integrations", "datafusion"] [dependencies] +async-trait = { workspace = true } dashmap = { workspace = true } datafusion = { version = "37.0.0" } futures = { workspace = true } diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs index 78c138d1..c94e6a92 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table.rs @@ -15,9 +15,16 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::{any::Any, sync::Arc}; -use datafusion::datasource::{TableProvider, TableType}; +use async_trait::async_trait; +use datafusion::{ + arrow::datatypes::SchemaRef, + datasource::{TableProvider, TableType}, + execution::context, + logical_expr::Expr, + physical_plan::ExecutionPlan, +}; use iceberg::{table::Table, Catalog, NamespaceIdent, Result, TableIdent}; pub(crate) struct IcebergTableProvider { @@ -38,42 +45,27 @@ impl IcebergTableProvider { } } +#[async_trait] impl TableProvider for IcebergTableProvider { - fn as_any(&self) -> &dyn std::any::Any { + fn as_any(&self) -> &dyn Any { self } - fn schema(&self) -> datafusion::arrow::datatypes::SchemaRef { + fn schema(&self) -> SchemaRef { todo!() } - fn table_type(&self) -> datafusion::datasource::TableType { + fn table_type(&self) -> TableType { TableType::Base } - fn scan<'life0, 'life1, 'life2, 'life3, 'async_trait>( - &'life0 self, - _state: &'life1 datafusion::execution::context::SessionState, - _projection: Option<&'life2 Vec>, - _filters: &'life3 [datafusion::prelude::Expr], + async fn scan( + &self, + _state: &context::SessionState, + _projection: Option<&Vec>, + _filters: &[Expr], _limit: Option, - ) -> core::pin::Pin< - Box< - dyn core::future::Future< - Output = datafusion::error::Result< - Arc, - >, - > + core::marker::Send - + 'async_trait, - >, - > - where - 'life0: 'async_trait, - 'life1: 'async_trait, - 'life2: 'async_trait, - 'life3: 'async_trait, - Self: 'async_trait, - { + ) -> datafusion::error::Result> { todo!() } } From 60ff7f28e838f3bb683938b14529b4901945be31 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 17:58:06 +0200 Subject: [PATCH 15/36] fix: imports + async_trait --- crates/integrations/datafusion/src/schema.rs | 35 ++++---------------- 1 file changed, 7 insertions(+), 28 deletions(-) diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index eacebba9..8825b9ef 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -17,11 +17,10 @@ use std::{any::Any, sync::Arc}; +use async_trait::async_trait; use dashmap::DashMap; -use datafusion::{ - catalog::schema::SchemaProvider, datasource::TableProvider, error::DataFusionError, -}; -use futures::{future::try_join_all, FutureExt}; +use datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider}; +use futures::future::try_join_all; use iceberg::{Catalog, NamespaceIdent, Result}; use crate::table::IcebergTableProvider; @@ -61,6 +60,7 @@ impl IcebergSchemaProvider { } } +#[async_trait] impl SchemaProvider for IcebergSchemaProvider { fn as_any(&self) -> &dyn Any { self @@ -74,29 +74,8 @@ impl SchemaProvider for IcebergSchemaProvider { self.tables.get(name).is_some() } - fn table<'life0, 'life1, 'async_trait>( - &'life0 self, - name: &'life1 str, - ) -> core::pin::Pin< - Box< - dyn core::future::Future< - Output = datafusion::error::Result< - Option>, - DataFusionError, - >, - > + core::marker::Send - + 'async_trait, - >, - > - where - 'life0: 'async_trait, - 'life1: 'async_trait, - Self: 'async_trait, - { - async move { - let table = self.tables.get(name).map(|c| c.value().clone()); - Ok(table) - } - .boxed() + async fn table(&self, name: &str) -> datafusion::error::Result>> { + let table = self.tables.get(name).map(|c| c.value().clone()); + Ok(table) } } From 7ef31fd4da693da91064137025be626b93cedfae Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 18:07:08 +0200 Subject: [PATCH 16/36] chore: remove feature flag --- .../datafusion/tests/integration_datafusion_hms_test.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs index 6c983088..26ae50cc 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs @@ -17,8 +17,6 @@ //! Integration tests for Datafusion with Hive Metastore. -#![cfg(feature = "datafusion")] - use std::collections::HashMap; use std::sync::Arc; @@ -27,7 +25,7 @@ use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport}; -use iceberg_integrations_datafusion::IcebergCatalogProvider; +use iceberg_datafusion::IcebergCatalogProvider; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; use port_scanner::scan_port_addr; From 2f152fefa8d299fc101325c7bbe5cf77e78c121f Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 18:08:36 +0200 Subject: [PATCH 17/36] fix: cargo sort --- crates/integrations/datafusion/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 61f1913a..122c5f3d 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -28,7 +28,6 @@ repository = { workspace = true } license = { workspace = true } keywords = ["iceberg", "integrations", "datafusion"] - [dependencies] async-trait = { workspace = true } dashmap = { workspace = true } From 8135bc73989e211c0d2cde3337086b66387e593e Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 18:28:19 +0200 Subject: [PATCH 18/36] refactor: CatalogProvider `fn try_new` --- crates/integrations/datafusion/src/catalog.rs | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 96922ac1..aaeac06b 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -30,28 +30,35 @@ pub struct IcebergCatalogProvider { impl IcebergCatalogProvider { pub async fn try_new(client: Arc) -> Result { - let schema_names: Vec = client + let schema_names: Vec<_> = client .list_namespaces(None) .await? .iter() .flat_map(|ns| ns.as_ref().clone()) .collect(); - let futures: Vec<_> = schema_names - .iter() - .map(|name| { - IcebergSchemaProvider::try_new(client.clone(), NamespaceIdent::new(name.clone())) + let providers = try_join_all( + schema_names + .iter() + .map(|name| { + IcebergSchemaProvider::try_new( + client.clone(), + NamespaceIdent::new(name.clone()), + ) + }) + .collect::>(), + ) + .await?; + + let schemas: Vec<_> = schema_names + .into_iter() + .zip(providers.into_iter()) + .map(|(name, provider)| { + let provider = Arc::new(provider) as Arc; + (name, provider) }) .collect(); - let providers = try_join_all(futures).await?; - - let mut schemas = Vec::new(); - for (name, provider) in schema_names.into_iter().zip(providers.into_iter()) { - let provider = Arc::new(provider) as Arc; - schemas.push((name, provider)) - } - Ok(IcebergCatalogProvider { schemas: schemas.into_iter().collect(), }) From 6cd85ccb46946c10fce25762e4739c1ee97c0402 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 18:38:02 +0200 Subject: [PATCH 19/36] refactor: SchemaProvider `fn try_new` --- crates/integrations/datafusion/src/schema.rs | 30 +++++++++++--------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 8825b9ef..a42d80f4 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -34,25 +34,29 @@ impl IcebergSchemaProvider { client: Arc, namespace: NamespaceIdent, ) -> Result { - let table_names: Vec = client + let table_names: Vec<_> = client .list_tables(&namespace) .await? .iter() - .map(|t| t.name().to_owned()) + .map(|tbl| tbl.name().to_string()) .collect(); - let futures: Vec<_> = table_names - .iter() - .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) - .collect(); + let providers = try_join_all( + table_names + .iter() + .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) + .collect::>(), + ) + .await?; - let providers = try_join_all(futures).await?; - - let mut tables = Vec::new(); - for (name, provider) in table_names.into_iter().zip(providers.into_iter()) { - let provider = Arc::new(provider) as Arc; - tables.push((name, provider)); - } + let tables: Vec<_> = table_names + .into_iter() + .zip(providers.into_iter()) + .map(|(name, provider)| { + let provider = Arc::new(provider) as Arc; + (name, provider) + }) + .collect(); Ok(IcebergSchemaProvider { tables: tables.into_iter().collect(), From d64678522395dcdc396fee3e5f345e8599153981 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 18:48:43 +0200 Subject: [PATCH 20/36] chore: update docs --- crates/integrations/datafusion/src/catalog.rs | 15 +++++++++++++++ crates/integrations/datafusion/src/table.rs | 3 +-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index aaeac06b..f21aff13 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -24,11 +24,26 @@ use iceberg::{Catalog, NamespaceIdent, Result}; use crate::schema::IcebergSchemaProvider; +/// Provides an interface to manage and access multiple schemas +/// within an Iceberg [`Catalog`]. +/// +/// Acts as a centralized catalog provider that aggregates +/// multiple [`SchemaProvider`], each associated with distinct namespaces. pub struct IcebergCatalogProvider { + /// A concurrent `HashMap` where keys are namespace names + /// and values are dynamic references to objects implementing the + /// [`SchemaProvider`] trait schemas: DashMap>, } impl IcebergCatalogProvider { + /// Asynchronously constructs a new [`IcebergCatalogProvider`] + /// using the given client to fetch and initialize schema providers for + /// each namespace in the Iceberg catalog. + /// + /// This method retrieves the list of namespace names from the Iceberg + /// catalog, attempts to create a schema provider for each namespace, and + /// collects these providers into a concurrent `HashMap`. pub async fn try_new(client: Arc) -> Result { let schema_names: Vec<_> = client .list_namespaces(None) diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs index c94e6a92..c76ac9b2 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table.rs @@ -37,8 +37,7 @@ impl IcebergTableProvider { namespace: NamespaceIdent, name: impl Into, ) -> Result { - let name = name.into(); - let ident = TableIdent::new(namespace, name); + let ident = TableIdent::new(namespace, name.into()); let table = client.load_table(&ident).await?; Ok(IcebergTableProvider { _inner: table }) From b466936ac83a34df65715dfaff5d8a156a31877b Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 19:02:13 +0200 Subject: [PATCH 21/36] chore: update docs --- crates/integrations/datafusion/src/catalog.rs | 10 +++++----- crates/integrations/datafusion/src/schema.rs | 12 ++++++++++++ crates/integrations/datafusion/src/table.rs | 6 ++++++ 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index f21aff13..4d3cf161 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -32,17 +32,17 @@ use crate::schema::IcebergSchemaProvider; pub struct IcebergCatalogProvider { /// A concurrent `HashMap` where keys are namespace names /// and values are dynamic references to objects implementing the - /// [`SchemaProvider`] trait + /// [`SchemaProvider`] trait. schemas: DashMap>, } impl IcebergCatalogProvider { - /// Asynchronously constructs a new [`IcebergCatalogProvider`] + /// Asynchronously tries to construct a new [`IcebergCatalogProvider`] /// using the given client to fetch and initialize schema providers for - /// each namespace in the Iceberg catalog. + /// each namespace in the Iceberg [`Catalog`]. /// - /// This method retrieves the list of namespace names from the Iceberg - /// catalog, attempts to create a schema provider for each namespace, and + /// This method retrieves the list of namespace names + /// attempts to create a schema provider for each namespace, and /// collects these providers into a concurrent `HashMap`. pub async fn try_new(client: Arc) -> Result { let schema_names: Vec<_> = client diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index a42d80f4..846cec44 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -25,11 +25,23 @@ use iceberg::{Catalog, NamespaceIdent, Result}; use crate::table::IcebergTableProvider; +/// Represents a [`SchemaProvider`] for the Iceberg [`Catalog`], managing +/// access to table providers within a specific namespace. pub(crate) struct IcebergSchemaProvider { + /// A concurrent `HashMap` where keys are table names + /// and values are dynamic references to objects implementing the + /// [`TableProvider`] trait. tables: DashMap>, } impl IcebergSchemaProvider { + /// Asynchronously tries to construct a new [`IcebergSchemaProvider`] + /// using the given client to fetch and initialize table providers for + /// the provided namespace in the Iceberg [`Catalog`]. + /// + /// This method retrieves a list of table names + /// attempts to create a table provider for each table name, and + /// collects these providers into a concurrent `HashMap`. pub(crate) async fn try_new( client: Arc, namespace: NamespaceIdent, diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs index c76ac9b2..eccf2620 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table.rs @@ -27,11 +27,17 @@ use datafusion::{ }; use iceberg::{table::Table, Catalog, NamespaceIdent, Result, TableIdent}; +/// Represents a [`TableProvider`] for the Iceberg [`Catalog`], +/// managing access to a [`Table`]. pub(crate) struct IcebergTableProvider { + /// A table in the catalog. _inner: Table, } impl IcebergTableProvider { + /// Asynchronously tries to construct a new [`IcebergTableProvider`] + /// using the given client and table name to fetch an actual [`Table`] + /// in the provided namespace. pub(crate) async fn try_new( client: Arc, namespace: NamespaceIdent, From 3c9bafce6a91549aaf753cd43464846621cbdaf2 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 19:19:29 +0200 Subject: [PATCH 22/36] chore: update doc --- .../datafusion/tests/integration_datafusion_hms_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs index 26ae50cc..202b429a 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Integration tests for Datafusion with Hive Metastore. +//! Integration tests for Iceberg Datafusion with Hive Metastore. use std::collections::HashMap; use std::sync::Arc; From d24a0d3681ce0321a248106910c3b952c640c61a Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 19:38:25 +0200 Subject: [PATCH 23/36] feat: impl `fn schema` on TableProvider --- crates/integrations/datafusion/src/table.rs | 15 ++++++-- .../tests/integration_datafusion_hms_test.rs | 36 +++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs index eccf2620..46f423e3 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table.rs @@ -25,13 +25,17 @@ use datafusion::{ logical_expr::Expr, physical_plan::ExecutionPlan, }; -use iceberg::{table::Table, Catalog, NamespaceIdent, Result, TableIdent}; +use iceberg::{ + arrow::schema_to_arrow_schema, table::Table, Catalog, NamespaceIdent, Result, TableIdent, +}; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. pub(crate) struct IcebergTableProvider { /// A table in the catalog. _inner: Table, + /// A reference-counted arrow `Schema`. + schema: SchemaRef, } impl IcebergTableProvider { @@ -46,7 +50,12 @@ impl IcebergTableProvider { let ident = TableIdent::new(namespace, name.into()); let table = client.load_table(&ident).await?; - Ok(IcebergTableProvider { _inner: table }) + let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); + + Ok(IcebergTableProvider { + _inner: table, + schema, + }) } } @@ -57,7 +66,7 @@ impl TableProvider for IcebergTableProvider { } fn schema(&self) -> SchemaRef { - todo!() + self.schema.clone() } fn table_type(&self) -> TableType { diff --git a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs index 202b429a..20c5cc87 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::sync::Arc; +use datafusion::arrow::datatypes::DataType; use datafusion::execution::context::SessionContext; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; @@ -106,6 +107,41 @@ fn set_table_creation(location: impl ToString, name: impl ToString) -> Result Result<()> { + let fixture = set_test_fixture("test_provider_get_table_schema").await; + + let namespace = NamespaceIdent::new("default".to_string()); + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + + fixture + .hms_catalog + .create_table(&namespace, creation) + .await?; + + let client = Arc::new(fixture.hms_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("hive", catalog); + + let provider = ctx.catalog("hive").unwrap(); + let schema = provider.schema("default").unwrap(); + + let table = schema.table("my_table").await.unwrap().unwrap(); + let table_schema = table.schema(); + + let expected = [("foo", &DataType::Int32), ("bar", &DataType::Utf8)]; + + for (field, exp) in table_schema.fields().iter().zip(expected.iter()) { + assert_eq!(field.name(), exp.0); + assert_eq!(field.data_type(), exp.1); + assert!(!field.is_nullable()) + } + + Ok(()) +} + #[tokio::test] async fn test_provider_list_table_names() -> Result<()> { let fixture = set_test_fixture("test_provider_list_table_names").await; From 948fc56478d3521d563ef1dd713570d493476209 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 20:35:13 +0200 Subject: [PATCH 24/36] chore: rename ArrowSchema --- crates/integrations/datafusion/src/table.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs index 46f423e3..af5e0300 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table.rs @@ -19,7 +19,7 @@ use std::{any::Any, sync::Arc}; use async_trait::async_trait; use datafusion::{ - arrow::datatypes::SchemaRef, + arrow::datatypes::SchemaRef as ArrowSchemaRef, datasource::{TableProvider, TableType}, execution::context, logical_expr::Expr, @@ -35,7 +35,7 @@ pub(crate) struct IcebergTableProvider { /// A table in the catalog. _inner: Table, /// A reference-counted arrow `Schema`. - schema: SchemaRef, + schema: ArrowSchemaRef, } impl IcebergTableProvider { @@ -65,7 +65,7 @@ impl TableProvider for IcebergTableProvider { self } - fn schema(&self) -> SchemaRef { + fn schema(&self) -> ArrowSchemaRef { self.schema.clone() } From 5b9d9c725f089b9f40ea6deddaf7e7653e1a0b46 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Thu, 25 Apr 2024 20:35:21 +0200 Subject: [PATCH 25/36] refactor: remove DashMap --- Cargo.toml | 1 - crates/integrations/datafusion/Cargo.toml | 1 - crates/integrations/datafusion/src/catalog.rs | 9 ++++----- crates/integrations/datafusion/src/schema.rs | 10 ++++------ 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 89115b8c..d1894c14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,6 @@ bimap = "0.6" bitvec = "1.0.1" bytes = "1.5" chrono = "0.4.34" -dashmap = "5.5.3" derive_builder = "0.20.0" either = "1" env_logger = "0.11.0" diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 122c5f3d..246c303e 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -30,7 +30,6 @@ keywords = ["iceberg", "integrations", "datafusion"] [dependencies] async-trait = { workspace = true } -dashmap = { workspace = true } datafusion = { version = "37.0.0" } futures = { workspace = true } iceberg = { workspace = true } diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 4d3cf161..a321d819 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -15,9 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, sync::Arc}; +use std::{any::Any, collections::HashMap, sync::Arc}; -use dashmap::DashMap; use datafusion::catalog::{schema::SchemaProvider, CatalogProvider}; use futures::future::try_join_all; use iceberg::{Catalog, NamespaceIdent, Result}; @@ -33,7 +32,7 @@ pub struct IcebergCatalogProvider { /// A concurrent `HashMap` where keys are namespace names /// and values are dynamic references to objects implementing the /// [`SchemaProvider`] trait. - schemas: DashMap>, + schemas: HashMap>, } impl IcebergCatalogProvider { @@ -86,10 +85,10 @@ impl CatalogProvider for IcebergCatalogProvider { } fn schema_names(&self) -> Vec { - self.schemas.iter().map(|c| c.key().clone()).collect() + self.schemas.keys().cloned().collect() } fn schema(&self, name: &str) -> Option> { - self.schemas.get(name).map(|c| c.value().clone()) + self.schemas.get(name).cloned() } } diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 846cec44..b66fc490 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, sync::Arc}; +use std::{any::Any, collections::HashMap, sync::Arc}; use async_trait::async_trait; -use dashmap::DashMap; use datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider}; use futures::future::try_join_all; use iceberg::{Catalog, NamespaceIdent, Result}; @@ -31,7 +30,7 @@ pub(crate) struct IcebergSchemaProvider { /// A concurrent `HashMap` where keys are table names /// and values are dynamic references to objects implementing the /// [`TableProvider`] trait. - tables: DashMap>, + tables: HashMap>, } impl IcebergSchemaProvider { @@ -83,7 +82,7 @@ impl SchemaProvider for IcebergSchemaProvider { } fn table_names(&self) -> Vec { - self.tables.iter().map(|c| c.key().clone()).collect() + self.tables.keys().cloned().collect() } fn table_exist(&self, name: &str) -> bool { @@ -91,7 +90,6 @@ impl SchemaProvider for IcebergSchemaProvider { } async fn table(&self, name: &str) -> datafusion::error::Result>> { - let table = self.tables.get(name).map(|c| c.value().clone()); - Ok(table) + Ok(self.tables.get(name).cloned()) } } From 0d55fbcb25e58abfa9934ab4148bf6378b6e1096 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sun, 28 Apr 2024 12:11:25 +0200 Subject: [PATCH 26/36] feat: add basic IcebergTableScan --- crates/iceberg/src/table.rs | 2 +- crates/integrations/datafusion/Cargo.toml | 1 + crates/integrations/datafusion/src/error.rs | 34 ++++++ crates/integrations/datafusion/src/lib.rs | 4 + .../datafusion/src/physical_plan/mod.rs | 18 +++ .../datafusion/src/physical_plan/scan.rs | 106 ++++++++++++++++++ crates/integrations/datafusion/src/table.rs | 14 ++- 7 files changed, 172 insertions(+), 7 deletions(-) create mode 100644 crates/integrations/datafusion/src/error.rs create mode 100644 crates/integrations/datafusion/src/physical_plan/mod.rs create mode 100644 crates/integrations/datafusion/src/physical_plan/scan.rs diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 839969d8..f38d7713 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -25,7 +25,7 @@ use futures::AsyncReadExt; use typed_builder::TypedBuilder; /// Table represents a table in the catalog. -#[derive(TypedBuilder, Debug)] +#[derive(TypedBuilder, Debug, Clone)] pub struct Table { file_io: FileIO, #[builder(default, setter(strip_option, into))] diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 246c303e..9f895ab3 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -29,6 +29,7 @@ license = { workspace = true } keywords = ["iceberg", "integrations", "datafusion"] [dependencies] +anyhow = { workspace = true } async-trait = { workspace = true } datafusion = { version = "37.0.0" } futures = { workspace = true } diff --git a/crates/integrations/datafusion/src/error.rs b/crates/integrations/datafusion/src/error.rs new file mode 100644 index 00000000..37402437 --- /dev/null +++ b/crates/integrations/datafusion/src/error.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use anyhow::anyhow; +use iceberg::{Error, ErrorKind}; + +/// Convert a datafusion error into iceberg error. +pub fn from_datafusion_error(error: datafusion::error::DataFusionError) -> Error { + match error { + other => Error::new( + ErrorKind::Unexpected, + "Operation failed for hitting datafusion error".to_string(), + ) + .with_source(anyhow!("datafusion error: {:?}", other)), + } +} +/// Convert an iceberg error into datafusion error. +pub fn to_datafusion_error(error: Error) -> datafusion::error::DataFusionError { + datafusion::error::DataFusionError::External(error.into()) +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 24cfb6bf..c4029011 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -18,5 +18,9 @@ mod catalog; pub use catalog::*; +mod error; +pub use error::*; + +mod physical_plan; mod schema; mod table; diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs new file mode 100644 index 00000000..5ae586a0 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod scan; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs new file mode 100644 index 00000000..ae4235f1 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -0,0 +1,106 @@ +use std::{any::Any, pin::Pin, sync::Arc}; + +use datafusion::{ + arrow::{array::RecordBatch, datatypes::SchemaRef as ArrowSchemaRef}, + execution::{SendableRecordBatchStream, TaskContext}, + physical_expr::EquivalenceProperties, + physical_plan::{ + stream::RecordBatchStreamAdapter, DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, + PlanProperties, + }, +}; +use futures::{Stream, TryStreamExt}; +use iceberg::table::Table; + +use crate::to_datafusion_error; + +#[derive(Debug)] +pub(crate) struct IcebergTableScan { + table: Table, + schema: ArrowSchemaRef, + plan_properties: PlanProperties, +} + +impl IcebergTableScan { + pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { + let plan_properties = Self::compute_properties(schema.clone()); + + Self { + table, + schema, + plan_properties, + } + } + + fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties { + // TODO: + // This is more or less a placeholder, to be replaced + // once we support output-partitioning + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) + } +} + +impl ExecutionPlan for IcebergTableScan { + fn as_any(&self) -> &dyn Any { + self + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion::error::Result> { + Ok(self) + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> datafusion::error::Result { + let fut = get_batch_stream(self.table.clone()); + let stream = futures::stream::once(fut).try_flatten(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + stream, + ))) + } +} + +impl DisplayAs for IcebergTableScan { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "IcebergTableScan") + } +} + +async fn get_batch_stream( + table: Table, +) -> datafusion::error::Result< + Pin> + Send>>, +> { + let table_scan = table.scan().build().map_err(to_datafusion_error)?; + + let stream = table_scan + .to_arrow() + .await + .map_err(to_datafusion_error)? + .map_err(to_datafusion_error); + + Ok(Box::pin(stream)) +} diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs index af5e0300..75e068d0 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table.rs @@ -29,11 +29,13 @@ use iceberg::{ arrow::schema_to_arrow_schema, table::Table, Catalog, NamespaceIdent, Result, TableIdent, }; +use crate::physical_plan::scan::IcebergTableScan; + /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. pub(crate) struct IcebergTableProvider { /// A table in the catalog. - _inner: Table, + table: Table, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, } @@ -52,10 +54,7 @@ impl IcebergTableProvider { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); - Ok(IcebergTableProvider { - _inner: table, - schema, - }) + Ok(IcebergTableProvider { table, schema }) } } @@ -80,6 +79,9 @@ impl TableProvider for IcebergTableProvider { _filters: &[Expr], _limit: Option, ) -> datafusion::error::Result> { - todo!() + Ok(Arc::new(IcebergTableScan::new( + self.table.clone(), + self.schema.clone(), + ))) } } From 294e5759cffbc08788c9e2afc36577bf6fe7d4ad Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sun, 28 Apr 2024 12:30:33 +0200 Subject: [PATCH 27/36] chore: fix docs --- crates/integrations/datafusion/src/catalog.rs | 4 ++-- crates/integrations/datafusion/src/error.rs | 4 ++-- crates/integrations/datafusion/src/schema.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index a321d819..4888ab88 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -29,7 +29,7 @@ use crate::schema::IcebergSchemaProvider; /// Acts as a centralized catalog provider that aggregates /// multiple [`SchemaProvider`], each associated with distinct namespaces. pub struct IcebergCatalogProvider { - /// A concurrent `HashMap` where keys are namespace names + /// A `HashMap` where keys are namespace names /// and values are dynamic references to objects implementing the /// [`SchemaProvider`] trait. schemas: HashMap>, @@ -42,7 +42,7 @@ impl IcebergCatalogProvider { /// /// This method retrieves the list of namespace names /// attempts to create a schema provider for each namespace, and - /// collects these providers into a concurrent `HashMap`. + /// collects these providers into a `HashMap`. pub async fn try_new(client: Arc) -> Result { let schema_names: Vec<_> = client .list_namespaces(None) diff --git a/crates/integrations/datafusion/src/error.rs b/crates/integrations/datafusion/src/error.rs index 37402437..c706bff1 100644 --- a/crates/integrations/datafusion/src/error.rs +++ b/crates/integrations/datafusion/src/error.rs @@ -18,7 +18,7 @@ use anyhow::anyhow; use iceberg::{Error, ErrorKind}; -/// Convert a datafusion error into iceberg error. +/// Converts a datafusion error into an iceberg error. pub fn from_datafusion_error(error: datafusion::error::DataFusionError) -> Error { match error { other => Error::new( @@ -28,7 +28,7 @@ pub fn from_datafusion_error(error: datafusion::error::DataFusionError) -> Error .with_source(anyhow!("datafusion error: {:?}", other)), } } -/// Convert an iceberg error into datafusion error. +/// Converts an iceberg error into a datafusion error. pub fn to_datafusion_error(error: Error) -> datafusion::error::DataFusionError { datafusion::error::DataFusionError::External(error.into()) } diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index b66fc490..b6a587ef 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -27,7 +27,7 @@ use crate::table::IcebergTableProvider; /// Represents a [`SchemaProvider`] for the Iceberg [`Catalog`], managing /// access to table providers within a specific namespace. pub(crate) struct IcebergSchemaProvider { - /// A concurrent `HashMap` where keys are table names + /// A `HashMap` where keys are table names /// and values are dynamic references to objects implementing the /// [`TableProvider`] trait. tables: HashMap>, @@ -40,7 +40,7 @@ impl IcebergSchemaProvider { /// /// This method retrieves a list of table names /// attempts to create a table provider for each table name, and - /// collects these providers into a concurrent `HashMap`. + /// collects these providers into a `HashMap`. pub(crate) async fn try_new( client: Arc, namespace: NamespaceIdent, From 13cc2d8d17ab8d458de4824cdabc3d5fa053fa89 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sun, 28 Apr 2024 12:38:18 +0200 Subject: [PATCH 28/36] chore: add comments --- crates/integrations/datafusion/src/catalog.rs | 3 +++ crates/integrations/datafusion/src/schema.rs | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 4888ab88..467479cd 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -44,6 +44,9 @@ impl IcebergCatalogProvider { /// attempts to create a schema provider for each namespace, and /// collects these providers into a `HashMap`. pub async fn try_new(client: Arc) -> Result { + // TODO: + // Schemas and providers should be cached and evicted based on time + // As of right now; schemas might become stale. let schema_names: Vec<_> = client .list_namespaces(None) .await? diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index b6a587ef..75df27cc 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -45,6 +45,10 @@ impl IcebergSchemaProvider { client: Arc, namespace: NamespaceIdent, ) -> Result { + // TODO: + // Tables and providers should be cached based on table_name + // if we have a chache miss; we update our internal cache & check again + // As of right now; tables might become stale. let table_names: Vec<_> = client .list_tables(&namespace) .await? From c95b1dd549e260231abcc0eb432d7b9c070ddca3 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sun, 28 Apr 2024 12:40:34 +0200 Subject: [PATCH 29/36] fix: clippy --- crates/integrations/datafusion/src/error.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/crates/integrations/datafusion/src/error.rs b/crates/integrations/datafusion/src/error.rs index c706bff1..273d92fa 100644 --- a/crates/integrations/datafusion/src/error.rs +++ b/crates/integrations/datafusion/src/error.rs @@ -20,13 +20,11 @@ use iceberg::{Error, ErrorKind}; /// Converts a datafusion error into an iceberg error. pub fn from_datafusion_error(error: datafusion::error::DataFusionError) -> Error { - match error { - other => Error::new( - ErrorKind::Unexpected, - "Operation failed for hitting datafusion error".to_string(), - ) - .with_source(anyhow!("datafusion error: {:?}", other)), - } + Error::new( + ErrorKind::Unexpected, + "Operation failed for hitting datafusion error".to_string(), + ) + .with_source(anyhow!("datafusion error: {:?}", error)) } /// Converts an iceberg error into a datafusion error. pub fn to_datafusion_error(error: Error) -> datafusion::error::DataFusionError { From 32f33cb0c01992214727ceb162e0a6c682b4eda1 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sun, 28 Apr 2024 12:41:33 +0200 Subject: [PATCH 30/36] fix: typo --- crates/integrations/datafusion/src/schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 75df27cc..a7fe8860 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -47,7 +47,7 @@ impl IcebergSchemaProvider { ) -> Result { // TODO: // Tables and providers should be cached based on table_name - // if we have a chache miss; we update our internal cache & check again + // if we have a cache miss; we update our internal cache & check again // As of right now; tables might become stale. let table_names: Vec<_> = client .list_tables(&namespace) From 30830ec17e0d007fae824190ea97ccc37c85b9b8 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sun, 28 Apr 2024 12:43:39 +0200 Subject: [PATCH 31/36] fix: license --- .../datafusion/src/physical_plan/scan.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index ae4235f1..2371627c 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::{any::Any, pin::Pin, sync::Arc}; use datafusion::{ From 391f9831bc8ca5b858673b678ade9a0e32e916d7 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 29 Apr 2024 15:58:49 +0200 Subject: [PATCH 32/36] chore: update docs --- .../datafusion/src/physical_plan/scan.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 2371627c..03a5758c 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -32,13 +32,20 @@ use iceberg::table::Table; use crate::to_datafusion_error; #[derive(Debug)] +/// Manages the scanning process of an Iceberg [`Table`], encapsulating the +/// necessary details and computed properties required for execution planning. pub(crate) struct IcebergTableScan { + /// A table in the catalog. table: Table, + /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, + /// Stores certain, often expensive to compute, + /// plan properties used in query optimization. plan_properties: PlanProperties, } impl IcebergTableScan { + /// Creates a new [`IcebergTableScan`] object. pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { let plan_properties = Self::compute_properties(schema.clone()); @@ -49,6 +56,7 @@ impl IcebergTableScan { } } + /// Computes [`PlanProperties`] used in query optimization. fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties { // TODO: // This is more or less a placeholder, to be replaced @@ -106,6 +114,11 @@ impl DisplayAs for IcebergTableScan { } } +/// Asynchronously retrieves a stream of [`RecordBatch`] instances +/// from a given table. +/// +/// This function initializes a [`TableScan`], builds it, +/// and then converts it into a stream of Arrow [`RecordBatch`]es. async fn get_batch_stream( table: Table, ) -> datafusion::error::Result< From d94d615e0c5f5ccb04aef666735c392743109857 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 15:03:19 +0200 Subject: [PATCH 33/36] chore: move derive stmt --- crates/integrations/datafusion/src/physical_plan/scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 03a5758c..1923ec32 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -31,9 +31,9 @@ use iceberg::table::Table; use crate::to_datafusion_error; -#[derive(Debug)] /// Manages the scanning process of an Iceberg [`Table`], encapsulating the /// necessary details and computed properties required for execution planning. +#[derive(Debug)] pub(crate) struct IcebergTableScan { /// A table in the catalog. table: Table, From 996f2495d01e74bad9b9d905dbf6c49d0efa418a Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 15:10:39 +0200 Subject: [PATCH 34/36] fix: collect into hashmap --- crates/integrations/datafusion/src/catalog.rs | 6 ++---- crates/integrations/datafusion/src/schema.rs | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 467479cd..deddde9f 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -67,7 +67,7 @@ impl IcebergCatalogProvider { ) .await?; - let schemas: Vec<_> = schema_names + let schemas: HashMap> = schema_names .into_iter() .zip(providers.into_iter()) .map(|(name, provider)| { @@ -76,9 +76,7 @@ impl IcebergCatalogProvider { }) .collect(); - Ok(IcebergCatalogProvider { - schemas: schemas.into_iter().collect(), - }) + Ok(IcebergCatalogProvider { schemas }) } } diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index a7fe8860..cd510a18 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -64,7 +64,7 @@ impl IcebergSchemaProvider { ) .await?; - let tables: Vec<_> = table_names + let tables: HashMap> = table_names .into_iter() .zip(providers.into_iter()) .map(|(name, provider)| { @@ -73,9 +73,7 @@ impl IcebergSchemaProvider { }) .collect(); - Ok(IcebergSchemaProvider { - tables: tables.into_iter().collect(), - }) + Ok(IcebergSchemaProvider { tables }) } } From 199382dbc1039e10c11d4f439e596dacfd1d54af Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 15:14:11 +0200 Subject: [PATCH 35/36] chore: use DFResult --- .../integrations/datafusion/src/physical_plan/scan.rs | 10 +++++----- crates/integrations/datafusion/src/schema.rs | 3 ++- crates/integrations/datafusion/src/table.rs | 3 ++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 1923ec32..cc01148f 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -17,6 +17,8 @@ use std::{any::Any, pin::Pin, sync::Arc}; +use datafusion::error::Result as DFResult; + use datafusion::{ arrow::{array::RecordBatch, datatypes::SchemaRef as ArrowSchemaRef}, execution::{SendableRecordBatchStream, TaskContext}, @@ -81,7 +83,7 @@ impl ExecutionPlan for IcebergTableScan { fn with_new_children( self: Arc, _children: Vec>, - ) -> datafusion::error::Result> { + ) -> DFResult> { Ok(self) } @@ -93,7 +95,7 @@ impl ExecutionPlan for IcebergTableScan { &self, _partition: usize, _context: Arc, - ) -> datafusion::error::Result { + ) -> DFResult { let fut = get_batch_stream(self.table.clone()); let stream = futures::stream::once(fut).try_flatten(); @@ -121,9 +123,7 @@ impl DisplayAs for IcebergTableScan { /// and then converts it into a stream of Arrow [`RecordBatch`]es. async fn get_batch_stream( table: Table, -) -> datafusion::error::Result< - Pin> + Send>>, -> { +) -> DFResult> + Send>>> { let table_scan = table.scan().build().map_err(to_datafusion_error)?; let stream = table_scan diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index cd510a18..2ba69621 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -18,6 +18,7 @@ use std::{any::Any, collections::HashMap, sync::Arc}; use async_trait::async_trait; +use datafusion::error::Result as DFResult; use datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider}; use futures::future::try_join_all; use iceberg::{Catalog, NamespaceIdent, Result}; @@ -91,7 +92,7 @@ impl SchemaProvider for IcebergSchemaProvider { self.tables.get(name).is_some() } - async fn table(&self, name: &str) -> datafusion::error::Result>> { + async fn table(&self, name: &str) -> DFResult>> { Ok(self.tables.get(name).cloned()) } } diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs index 75e068d0..46a15f67 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table.rs @@ -18,6 +18,7 @@ use std::{any::Any, sync::Arc}; use async_trait::async_trait; +use datafusion::error::Result as DFResult; use datafusion::{ arrow::datatypes::SchemaRef as ArrowSchemaRef, datasource::{TableProvider, TableType}, @@ -78,7 +79,7 @@ impl TableProvider for IcebergTableProvider { _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, - ) -> datafusion::error::Result> { + ) -> DFResult> { Ok(Arc::new(IcebergTableScan::new( self.table.clone(), self.schema.clone(), From 177e5c8aa114f0b1815c21ee4365f4af14a30523 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 2 May 2024 11:05:39 +0800 Subject: [PATCH 36/36] Update crates/integrations/datafusion/README.md Co-authored-by: Liang-Chi Hsieh --- crates/integrations/datafusion/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/README.md b/crates/integrations/datafusion/README.md index 23245550..134a8eff 100644 --- a/crates/integrations/datafusion/README.md +++ b/crates/integrations/datafusion/README.md @@ -17,6 +17,6 @@ ~ under the License. --> -# Apache Iceberg Datafusion Integration +# Apache Iceberg DataFusion Integration -This crate contains the integration of Apache Datafusion and Apache Iceberg. +This crate contains the integration of Apache DataFusion and Apache Iceberg.