From bbd042da5835b3e2e8ce7c05d45377c367464efd Mon Sep 17 00:00:00 2001 From: Marvin Lanhenke <62298609+marvinlanhenke@users.noreply.github.com> Date: Thu, 2 May 2024 05:33:02 +0200 Subject: [PATCH] Basic Integration with Datafusion (#324) * chore: basic structure * feat: add IcebergCatalogProvider * feat: add IcebergSchemaProvider * feat: add IcebergTableProvider * chore: add integration test infr * fix: remove old test * chore: update crate structure * fix: remove workspace dep * refactor: use try_join_all * chore: remove feature flag * chore: rename package * chore: update readme * feat: add TableType * fix: import + async_trait * fix: imports + async_trait * chore: remove feature flag * fix: cargo sort * refactor: CatalogProvider `fn try_new` * refactor: SchemaProvider `fn try_new` * chore: update docs * chore: update docs * chore: update doc * feat: impl `fn schema` on TableProvider * chore: rename ArrowSchema * refactor: remove DashMap * feat: add basic IcebergTableScan * chore: fix docs * chore: add comments * fix: clippy * fix: typo * fix: license * chore: update docs * chore: move derive stmt * fix: collect into hashmap * chore: use DFResult * Update crates/integrations/datafusion/README.md Co-authored-by: Liang-Chi Hsieh --------- Co-authored-by: Renjie Liu Co-authored-by: Liang-Chi Hsieh --- Cargo.toml | 2 + crates/iceberg/src/table.rs | 2 +- crates/integrations/datafusion/Cargo.toml | 43 ++++ crates/integrations/datafusion/README.md | 22 ++ crates/integrations/datafusion/src/catalog.rs | 95 +++++++++ crates/integrations/datafusion/src/error.rs | 32 +++ crates/integrations/datafusion/src/lib.rs | 26 +++ .../datafusion/src/physical_plan/mod.rs | 18 ++ .../datafusion/src/physical_plan/scan.rs | 136 ++++++++++++ crates/integrations/datafusion/src/schema.rs | 98 +++++++++ crates/integrations/datafusion/src/table.rs | 88 ++++++++ .../datafusion/testdata/docker-compose.yaml | 50 +++++ .../testdata/hms_catalog/Dockerfile | 34 +++ .../testdata/hms_catalog/core-site.xml | 51 +++++ .../tests/integration_datafusion_hms_test.rs | 193 ++++++++++++++++++ 15 files changed, 889 insertions(+), 1 deletion(-) create mode 100644 crates/integrations/datafusion/Cargo.toml create mode 100644 crates/integrations/datafusion/README.md create mode 100644 crates/integrations/datafusion/src/catalog.rs create mode 100644 crates/integrations/datafusion/src/error.rs create mode 100644 crates/integrations/datafusion/src/lib.rs create mode 100644 crates/integrations/datafusion/src/physical_plan/mod.rs create mode 100644 crates/integrations/datafusion/src/physical_plan/scan.rs create mode 100644 crates/integrations/datafusion/src/schema.rs create mode 100644 crates/integrations/datafusion/src/table.rs create mode 100644 crates/integrations/datafusion/testdata/docker-compose.yaml create mode 100644 crates/integrations/datafusion/testdata/hms_catalog/Dockerfile create mode 100644 crates/integrations/datafusion/testdata/hms_catalog/core-site.xml create mode 100644 crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs diff --git a/Cargo.toml b/Cargo.toml index 3c2923d4..d1894c14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "crates/catalog/*", "crates/examples", "crates/iceberg", + "crates/integrations/*", "crates/test_utils", ] @@ -56,6 +57,7 @@ fnv = "1" futures = "0.3" iceberg = { version = "0.2.0", path = "./crates/iceberg" } iceberg-catalog-rest = { version = "0.2.0", path = "./crates/catalog/rest" } +iceberg-catalog-hms = { version = "0.2.0", path = "./crates/catalog/hms" } itertools = "0.12" lazy_static = "1" log = "^0.4" diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 839969d8..f38d7713 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -25,7 +25,7 @@ use futures::AsyncReadExt; use typed_builder::TypedBuilder; /// Table represents a table in the catalog. -#[derive(TypedBuilder, Debug)] +#[derive(TypedBuilder, Debug, Clone)] pub struct Table { file_io: FileIO, #[builder(default, setter(strip_option, into))] diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml new file mode 100644 index 00000000..9f895ab3 --- /dev/null +++ b/crates/integrations/datafusion/Cargo.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-datafusion" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Datafusion Integration" +repository = { workspace = true } +license = { workspace = true } +keywords = ["iceberg", "integrations", "datafusion"] + +[dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } +datafusion = { version = "37.0.0" } +futures = { workspace = true } +iceberg = { workspace = true } +log = { workspace = true } +tokio = { workspace = true } + +[dev-dependencies] +iceberg-catalog-hms = { workspace = true } +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +port_scanner = { workspace = true } diff --git a/crates/integrations/datafusion/README.md b/crates/integrations/datafusion/README.md new file mode 100644 index 00000000..134a8eff --- /dev/null +++ b/crates/integrations/datafusion/README.md @@ -0,0 +1,22 @@ + + +# Apache Iceberg DataFusion Integration + +This crate contains the integration of Apache DataFusion and Apache Iceberg. diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs new file mode 100644 index 00000000..deddde9f --- /dev/null +++ b/crates/integrations/datafusion/src/catalog.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, collections::HashMap, sync::Arc}; + +use datafusion::catalog::{schema::SchemaProvider, CatalogProvider}; +use futures::future::try_join_all; +use iceberg::{Catalog, NamespaceIdent, Result}; + +use crate::schema::IcebergSchemaProvider; + +/// Provides an interface to manage and access multiple schemas +/// within an Iceberg [`Catalog`]. +/// +/// Acts as a centralized catalog provider that aggregates +/// multiple [`SchemaProvider`], each associated with distinct namespaces. +pub struct IcebergCatalogProvider { + /// A `HashMap` where keys are namespace names + /// and values are dynamic references to objects implementing the + /// [`SchemaProvider`] trait. + schemas: HashMap>, +} + +impl IcebergCatalogProvider { + /// Asynchronously tries to construct a new [`IcebergCatalogProvider`] + /// using the given client to fetch and initialize schema providers for + /// each namespace in the Iceberg [`Catalog`]. + /// + /// This method retrieves the list of namespace names + /// attempts to create a schema provider for each namespace, and + /// collects these providers into a `HashMap`. + pub async fn try_new(client: Arc) -> Result { + // TODO: + // Schemas and providers should be cached and evicted based on time + // As of right now; schemas might become stale. + let schema_names: Vec<_> = client + .list_namespaces(None) + .await? + .iter() + .flat_map(|ns| ns.as_ref().clone()) + .collect(); + + let providers = try_join_all( + schema_names + .iter() + .map(|name| { + IcebergSchemaProvider::try_new( + client.clone(), + NamespaceIdent::new(name.clone()), + ) + }) + .collect::>(), + ) + .await?; + + let schemas: HashMap> = schema_names + .into_iter() + .zip(providers.into_iter()) + .map(|(name, provider)| { + let provider = Arc::new(provider) as Arc; + (name, provider) + }) + .collect(); + + Ok(IcebergCatalogProvider { schemas }) + } +} + +impl CatalogProvider for IcebergCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.schemas.keys().cloned().collect() + } + + fn schema(&self, name: &str) -> Option> { + self.schemas.get(name).cloned() + } +} diff --git a/crates/integrations/datafusion/src/error.rs b/crates/integrations/datafusion/src/error.rs new file mode 100644 index 00000000..273d92fa --- /dev/null +++ b/crates/integrations/datafusion/src/error.rs @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use anyhow::anyhow; +use iceberg::{Error, ErrorKind}; + +/// Converts a datafusion error into an iceberg error. +pub fn from_datafusion_error(error: datafusion::error::DataFusionError) -> Error { + Error::new( + ErrorKind::Unexpected, + "Operation failed for hitting datafusion error".to_string(), + ) + .with_source(anyhow!("datafusion error: {:?}", error)) +} +/// Converts an iceberg error into a datafusion error. +pub fn to_datafusion_error(error: Error) -> datafusion::error::DataFusionError { + datafusion::error::DataFusionError::External(error.into()) +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs new file mode 100644 index 00000000..c4029011 --- /dev/null +++ b/crates/integrations/datafusion/src/lib.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod catalog; +pub use catalog::*; + +mod error; +pub use error::*; + +mod physical_plan; +mod schema; +mod table; diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs new file mode 100644 index 00000000..5ae586a0 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod scan; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs new file mode 100644 index 00000000..cc01148f --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, pin::Pin, sync::Arc}; + +use datafusion::error::Result as DFResult; + +use datafusion::{ + arrow::{array::RecordBatch, datatypes::SchemaRef as ArrowSchemaRef}, + execution::{SendableRecordBatchStream, TaskContext}, + physical_expr::EquivalenceProperties, + physical_plan::{ + stream::RecordBatchStreamAdapter, DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, + PlanProperties, + }, +}; +use futures::{Stream, TryStreamExt}; +use iceberg::table::Table; + +use crate::to_datafusion_error; + +/// Manages the scanning process of an Iceberg [`Table`], encapsulating the +/// necessary details and computed properties required for execution planning. +#[derive(Debug)] +pub(crate) struct IcebergTableScan { + /// A table in the catalog. + table: Table, + /// A reference-counted arrow `Schema`. + schema: ArrowSchemaRef, + /// Stores certain, often expensive to compute, + /// plan properties used in query optimization. + plan_properties: PlanProperties, +} + +impl IcebergTableScan { + /// Creates a new [`IcebergTableScan`] object. + pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { + let plan_properties = Self::compute_properties(schema.clone()); + + Self { + table, + schema, + plan_properties, + } + } + + /// Computes [`PlanProperties`] used in query optimization. + fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties { + // TODO: + // This is more or less a placeholder, to be replaced + // once we support output-partitioning + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) + } +} + +impl ExecutionPlan for IcebergTableScan { + fn as_any(&self) -> &dyn Any { + self + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> DFResult { + let fut = get_batch_stream(self.table.clone()); + let stream = futures::stream::once(fut).try_flatten(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + stream, + ))) + } +} + +impl DisplayAs for IcebergTableScan { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "IcebergTableScan") + } +} + +/// Asynchronously retrieves a stream of [`RecordBatch`] instances +/// from a given table. +/// +/// This function initializes a [`TableScan`], builds it, +/// and then converts it into a stream of Arrow [`RecordBatch`]es. +async fn get_batch_stream( + table: Table, +) -> DFResult> + Send>>> { + let table_scan = table.scan().build().map_err(to_datafusion_error)?; + + let stream = table_scan + .to_arrow() + .await + .map_err(to_datafusion_error)? + .map_err(to_datafusion_error); + + Ok(Box::pin(stream)) +} diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs new file mode 100644 index 00000000..2ba69621 --- /dev/null +++ b/crates/integrations/datafusion/src/schema.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, collections::HashMap, sync::Arc}; + +use async_trait::async_trait; +use datafusion::error::Result as DFResult; +use datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider}; +use futures::future::try_join_all; +use iceberg::{Catalog, NamespaceIdent, Result}; + +use crate::table::IcebergTableProvider; + +/// Represents a [`SchemaProvider`] for the Iceberg [`Catalog`], managing +/// access to table providers within a specific namespace. +pub(crate) struct IcebergSchemaProvider { + /// A `HashMap` where keys are table names + /// and values are dynamic references to objects implementing the + /// [`TableProvider`] trait. + tables: HashMap>, +} + +impl IcebergSchemaProvider { + /// Asynchronously tries to construct a new [`IcebergSchemaProvider`] + /// using the given client to fetch and initialize table providers for + /// the provided namespace in the Iceberg [`Catalog`]. + /// + /// This method retrieves a list of table names + /// attempts to create a table provider for each table name, and + /// collects these providers into a `HashMap`. + pub(crate) async fn try_new( + client: Arc, + namespace: NamespaceIdent, + ) -> Result { + // TODO: + // Tables and providers should be cached based on table_name + // if we have a cache miss; we update our internal cache & check again + // As of right now; tables might become stale. + let table_names: Vec<_> = client + .list_tables(&namespace) + .await? + .iter() + .map(|tbl| tbl.name().to_string()) + .collect(); + + let providers = try_join_all( + table_names + .iter() + .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) + .collect::>(), + ) + .await?; + + let tables: HashMap> = table_names + .into_iter() + .zip(providers.into_iter()) + .map(|(name, provider)| { + let provider = Arc::new(provider) as Arc; + (name, provider) + }) + .collect(); + + Ok(IcebergSchemaProvider { tables }) + } +} + +#[async_trait] +impl SchemaProvider for IcebergSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.tables.keys().cloned().collect() + } + + fn table_exist(&self, name: &str) -> bool { + self.tables.get(name).is_some() + } + + async fn table(&self, name: &str) -> DFResult>> { + Ok(self.tables.get(name).cloned()) + } +} diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs new file mode 100644 index 00000000..46a15f67 --- /dev/null +++ b/crates/integrations/datafusion/src/table.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, sync::Arc}; + +use async_trait::async_trait; +use datafusion::error::Result as DFResult; +use datafusion::{ + arrow::datatypes::SchemaRef as ArrowSchemaRef, + datasource::{TableProvider, TableType}, + execution::context, + logical_expr::Expr, + physical_plan::ExecutionPlan, +}; +use iceberg::{ + arrow::schema_to_arrow_schema, table::Table, Catalog, NamespaceIdent, Result, TableIdent, +}; + +use crate::physical_plan::scan::IcebergTableScan; + +/// Represents a [`TableProvider`] for the Iceberg [`Catalog`], +/// managing access to a [`Table`]. +pub(crate) struct IcebergTableProvider { + /// A table in the catalog. + table: Table, + /// A reference-counted arrow `Schema`. + schema: ArrowSchemaRef, +} + +impl IcebergTableProvider { + /// Asynchronously tries to construct a new [`IcebergTableProvider`] + /// using the given client and table name to fetch an actual [`Table`] + /// in the provided namespace. + pub(crate) async fn try_new( + client: Arc, + namespace: NamespaceIdent, + name: impl Into, + ) -> Result { + let ident = TableIdent::new(namespace, name.into()); + let table = client.load_table(&ident).await?; + + let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); + + Ok(IcebergTableProvider { table, schema }) + } +} + +#[async_trait] +impl TableProvider for IcebergTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &context::SessionState, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + Ok(Arc::new(IcebergTableScan::new( + self.table.clone(), + self.schema.clone(), + ))) + } +} diff --git a/crates/integrations/datafusion/testdata/docker-compose.yaml b/crates/integrations/datafusion/testdata/docker-compose.yaml new file mode 100644 index 00000000..282dc66c --- /dev/null +++ b/crates/integrations/datafusion/testdata/docker-compose.yaml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3.8' + +services: + minio: + image: minio/minio:RELEASE.2024-03-07T00-43-48Z + expose: + - 9000 + - 9001 + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + command: [ "server", "/data", "--console-address", ":9001" ] + + mc: + depends_on: + - minio + image: minio/mc:RELEASE.2024-03-07T00-31-49Z + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f /dev/null " + + hive-metastore: + image: iceberg-hive-metastore + build: ./hms_catalog/ + expose: + - 9083 + environment: + SERVICE_NAME: "metastore" + SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/" diff --git a/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile b/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile new file mode 100644 index 00000000..ff8c9fae --- /dev/null +++ b/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM openjdk:8-jre-slim AS build + +RUN apt-get update -qq && apt-get -qq -y install curl + +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 + +RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar -Lo /tmp/aws-java-sdk-bundle-1.11.271.jar +RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar + + +FROM apache/hive:3.1.3 + +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 + +COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar /opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar +COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar /opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar +COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml \ No newline at end of file diff --git a/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml b/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml new file mode 100644 index 00000000..f0583a0b --- /dev/null +++ b/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml @@ -0,0 +1,51 @@ + + + + + fs.defaultFS + s3a://warehouse/hive + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3a.fast.upload + true + + + fs.s3a.endpoint + http://minio:9000 + + + fs.s3a.access.key + admin + + + fs.s3a.secret.key + password + + + fs.s3a.connection.ssl.enabled + false + + + fs.s3a.path.style.access + true + + \ No newline at end of file diff --git a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs new file mode 100644 index 00000000..20c5cc87 --- /dev/null +++ b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for Iceberg Datafusion with Hive Metastore. + +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::arrow::datatypes::DataType; +use datafusion::execution::context::SessionContext; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; +use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport}; +use iceberg_datafusion::IcebergCatalogProvider; +use iceberg_test_utils::docker::DockerCompose; +use iceberg_test_utils::{normalize_test_name, set_up}; +use port_scanner::scan_port_addr; +use tokio::time::sleep; + +const HMS_CATALOG_PORT: u16 = 9083; +const MINIO_PORT: u16 = 9000; + +struct TestFixture { + _docker_compose: DockerCompose, + hms_catalog: HmsCatalog, +} + +async fn set_test_fixture(func: &str) -> TestFixture { + set_up(); + + let docker_compose = DockerCompose::new( + normalize_test_name(format!("{}_{func}", module_path!())), + format!("{}/testdata", env!("CARGO_MANIFEST_DIR")), + ); + + docker_compose.run(); + + let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore"); + let minio_ip = docker_compose.get_container_ip("minio"); + + let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT); + loop { + if !scan_port_addr(&read_port) { + log::info!("Waiting for 1s hms catalog to ready..."); + sleep(std::time::Duration::from_millis(1000)).await; + } else { + break; + } + } + + let props = HashMap::from([ + ( + S3_ENDPOINT.to_string(), + format!("http://{}:{}", minio_ip, MINIO_PORT), + ), + (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()), + (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()), + (S3_REGION.to_string(), "us-east-1".to_string()), + ]); + + let config = HmsCatalogConfig::builder() + .address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT)) + .thrift_transport(HmsThriftTransport::Buffered) + .warehouse("s3a://warehouse/hive".to_string()) + .props(props) + .build(); + + let hms_catalog = HmsCatalog::new(config).unwrap(); + + TestFixture { + _docker_compose: docker_compose, + hms_catalog, + } +} + +fn set_table_creation(location: impl ToString, name: impl ToString) -> Result { + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + let creation = TableCreation::builder() + .location(location.to_string()) + .name(name.to_string()) + .properties(HashMap::new()) + .schema(schema) + .build(); + + Ok(creation) +} + +#[tokio::test] +async fn test_provider_get_table_schema() -> Result<()> { + let fixture = set_test_fixture("test_provider_get_table_schema").await; + + let namespace = NamespaceIdent::new("default".to_string()); + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + + fixture + .hms_catalog + .create_table(&namespace, creation) + .await?; + + let client = Arc::new(fixture.hms_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("hive", catalog); + + let provider = ctx.catalog("hive").unwrap(); + let schema = provider.schema("default").unwrap(); + + let table = schema.table("my_table").await.unwrap().unwrap(); + let table_schema = table.schema(); + + let expected = [("foo", &DataType::Int32), ("bar", &DataType::Utf8)]; + + for (field, exp) in table_schema.fields().iter().zip(expected.iter()) { + assert_eq!(field.name(), exp.0); + assert_eq!(field.data_type(), exp.1); + assert!(!field.is_nullable()) + } + + Ok(()) +} + +#[tokio::test] +async fn test_provider_list_table_names() -> Result<()> { + let fixture = set_test_fixture("test_provider_list_table_names").await; + + let namespace = NamespaceIdent::new("default".to_string()); + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + + fixture + .hms_catalog + .create_table(&namespace, creation) + .await?; + + let client = Arc::new(fixture.hms_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("hive", catalog); + + let provider = ctx.catalog("hive").unwrap(); + let schema = provider.schema("default").unwrap(); + + let expected = vec!["my_table"]; + let result = schema.table_names(); + + assert_eq!(result, expected); + + Ok(()) +} + +#[tokio::test] +async fn test_provider_list_schema_names() -> Result<()> { + let fixture = set_test_fixture("test_provider_list_schema_names").await; + set_table_creation("default", "my_table")?; + + let client = Arc::new(fixture.hms_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("hive", catalog); + + let provider = ctx.catalog("hive").unwrap(); + + let expected = vec!["default"]; + let result = provider.schema_names(); + + assert_eq!(result, expected); + + Ok(()) +}