From 5f87ee8c6584e07c39352df376f304d1b99fb8de Mon Sep 17 00:00:00 2001 From: Ajantha Bhat Date: Mon, 5 Feb 2024 19:26:07 +0530 Subject: [PATCH] Core: Add catalog type for glue,jdbc,nessie (#9647) Co-authored-by: zhaomin --- .../java/org/apache/iceberg/CatalogUtil.java | 15 ++++++++++++ .../apache/iceberg/jdbc/TestJdbcCatalog.java | 7 +++--- docs/docs/aws.md | 23 ++++++++++--------- docs/docs/flink-configuration.md | 2 +- docs/docs/flink.md | 2 +- docs/docs/hive.md | 2 +- docs/docs/jdbc.md | 2 +- docs/docs/nessie.md | 6 ++--- docs/docs/spark-configuration.md | 6 ++--- .../iceberg/nessie/TestNessieCatalog.java | 11 +++++---- 10 files changed, 46 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index f9af07c1a443..ccecfb19070b 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -67,10 +67,16 @@ public class CatalogUtil { public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop"; public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive"; public static final String ICEBERG_CATALOG_TYPE_REST = "rest"; + public static final String ICEBERG_CATALOG_TYPE_GLUE = "glue"; + public static final String ICEBERG_CATALOG_TYPE_NESSIE = "nessie"; + public static final String ICEBERG_CATALOG_TYPE_JDBC = "jdbc"; public static final String ICEBERG_CATALOG_HADOOP = "org.apache.iceberg.hadoop.HadoopCatalog"; public static final String ICEBERG_CATALOG_HIVE = "org.apache.iceberg.hive.HiveCatalog"; public static final String ICEBERG_CATALOG_REST = "org.apache.iceberg.rest.RESTCatalog"; + public static final String ICEBERG_CATALOG_GLUE = "org.apache.iceberg.aws.glue.GlueCatalog"; + public static final String ICEBERG_CATALOG_NESSIE = "org.apache.iceberg.nessie.NessieCatalog"; + public static final String ICEBERG_CATALOG_JDBC = "org.apache.iceberg.jdbc.JdbcCatalog"; private CatalogUtil() {} @@ -278,6 +284,15 @@ public static Catalog buildIcebergCatalog(String name, Map optio case ICEBERG_CATALOG_TYPE_REST: catalogImpl = ICEBERG_CATALOG_REST; break; + case ICEBERG_CATALOG_TYPE_GLUE: + catalogImpl = ICEBERG_CATALOG_GLUE; + break; + case ICEBERG_CATALOG_TYPE_NESSIE: + catalogImpl = ICEBERG_CATALOG_NESSIE; + break; + case ICEBERG_CATALOG_TYPE_JDBC: + catalogImpl = ICEBERG_CATALOG_JDBC; + break; default: throw new UnsupportedOperationException("Unknown catalog type: " + catalogType); } diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java index da6abec6ea9f..fb6e6b591c87 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; @@ -138,12 +139,10 @@ private JdbcCatalog initCatalog(String catalogName, Map props) { properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); warehouseLocation = this.tableDir.toAbsolutePath().toString(); properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); + properties.put("type", "jdbc"); properties.putAll(props); - JdbcCatalog jdbcCatalog = new JdbcCatalog(); - jdbcCatalog.setConf(conf); - jdbcCatalog.initialize(catalogName, properties); - return jdbcCatalog; + return (JdbcCatalog) CatalogUtil.buildIcebergCatalog(catalogName, properties, conf); } @Test diff --git a/docs/docs/aws.md b/docs/docs/aws.md index d42d549d1aab..d4cd2b22c4ca 100644 --- a/docs/docs/aws.md +++ b/docs/docs/aws.md @@ -51,7 +51,7 @@ spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{{ iceber --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \ - --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ + --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO ``` @@ -84,7 +84,7 @@ With those dependencies, you can create a Flink catalog like the following: CREATE CATALOG my_catalog WITH ( 'type'='iceberg', 'warehouse'='s3://my-bucket/my/key/prefix', - 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', + 'type'='glue', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO' ); ``` @@ -115,7 +115,7 @@ With those dependencies, you can register a Glue catalog and create external tab ```sql SET iceberg.engine.hive.enabled=true; SET hive.vectorized.execution.enabled=false; -SET iceberg.catalog.glue.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog; +SET iceberg.catalog.glue.type=glue; SET iceberg.catalog.glue.warehouse=s3://my-bucket/my/key/prefix; -- suppose you have an Iceberg table database_a.table_a created by GlueCatalog @@ -136,7 +136,8 @@ Iceberg enables the use of [AWS Glue](https://aws.amazon.com/glue) as the `Catal When used, an Iceberg namespace is stored as a [Glue Database](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html), an Iceberg table is stored as a [Glue Table](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-tables.html), and every Iceberg table version is stored as a [Glue TableVersion](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-tables.html#aws-glue-api-catalog-tables-TableVersion). -You can start using Glue catalog by specifying the `catalog-impl` as `org.apache.iceberg.aws.glue.GlueCatalog`, +You can start using Glue catalog by specifying the `catalog-impl` as `org.apache.iceberg.aws.glue.GlueCatalog` +or by setting `type` as `glue`, just like what is shown in the [enabling AWS integration](#enabling-aws-integration) section above. More details about loading the catalog can be found in individual engine pages, such as [Spark](spark-configuration.md#loading-a-custom-catalog) and [Flink](flink.md#creating-catalogs-and-using-catalogs). @@ -410,7 +411,7 @@ For example, to write S3 tags with Spark 3.3, you can start the Spark SQL shell ``` spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \ - --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ + --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.s3.write.tags.my_key1=my_val1 \ --conf spark.sql.catalog.my_catalog.s3.write.tags.my_key2=my_val2 @@ -428,7 +429,7 @@ For example, to add S3 delete tags with Spark 3.3, you can start the Spark SQL s ``` sh spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg-warehouse/s3-tagging \ - --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ + --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.s3.delete.tags.my_key3=my_val3 \ --conf spark.sql.catalog.my_catalog.s3.delete-enabled=false @@ -443,7 +444,7 @@ For example, to write table and namespace name as S3 tags with Spark 3.3, you ca ``` sh spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg-warehouse/s3-tagging \ - --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ + --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.s3.write.table-tag-enabled=true \ --conf spark.sql.catalog.my_catalog.s3.write.namespace-tag-enabled=true @@ -463,7 +464,7 @@ For example, to use S3 access-point with Spark 3.3, you can start the Spark SQL ``` spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \ - --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ + --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.s3.use-arn-region-enabled=false \ --conf spark.sql.catalog.test.s3.access-points.my-bucket1=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap \ @@ -484,7 +485,7 @@ For example, to use S3 Acceleration with Spark 3.3, you can start the Spark SQL ``` spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \ - --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ + --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.s3.acceleration-enabled=true ``` @@ -502,7 +503,7 @@ For example, to use S3 Dual-stack with Spark 3.3, you can start the Spark SQL sh ``` spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \ - --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ + --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.s3.dualstack-enabled=true ``` @@ -538,7 +539,7 @@ Here is an example to start Spark shell with this client factory: spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{{ icebergVersion }},org.apache.iceberg:iceberg-aws-bundle:{{ icebergVersion }} \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \ - --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ + --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.catalog.my_catalog.client.factory=org.apache.iceberg.aws.AssumeRoleAwsClientFactory \ --conf spark.sql.catalog.my_catalog.client.assume-role.arn=arn:aws:iam::123456789:role/myRoleToAssume \ --conf spark.sql.catalog.my_catalog.client.assume-role.region=ap-northeast-1 diff --git a/docs/docs/flink-configuration.md b/docs/docs/flink-configuration.md index 683169c96bda..42dc15f5b3d2 100644 --- a/docs/docs/flink-configuration.md +++ b/docs/docs/flink-configuration.md @@ -37,7 +37,7 @@ The following properties can be set globally and are not limited to a specific c | Property | Required | Values | Description | | ---------------------------- |----------| -------------------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | type | ✔️ | iceberg | Must be `iceberg`. | -| catalog-type | | `hive`, `hadoop` or `rest` | `hive`, `hadoop` or `rest` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. | +| catalog-type | | `hive`, `hadoop`, `rest`, `glue`, `jdbc` or `nessie` | The underlying Iceberg catalog implementation, `HiveCatalog`, `HadoopCatalog`, `RESTCatalog`, `GlueCatalog`, `JdbcCatalog`, `NessieCatalog` or left unset if using a custom catalog implementation via catalog-impl| | catalog-impl | | | The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. | | property-version | | | Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is `1`. | | cache-enabled | | `true` or `false` | Whether to enable catalog cache, default value is `true`. | diff --git a/docs/docs/flink.md b/docs/docs/flink.md index be5f715738f7..dcf6441b4a34 100644 --- a/docs/docs/flink.md +++ b/docs/docs/flink.md @@ -192,7 +192,7 @@ CREATE CATALOG WITH ( The following properties can be set globally and are not limited to a specific catalog implementation: * `type`: Must be `iceberg`. (required) -* `catalog-type`: `hive`, `hadoop` or `rest` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. (Optional) +* `catalog-type`: `hive`, `hadoop`, `rest`, `glue`, `jdbc` or `nessie` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. (Optional) * `catalog-impl`: The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. (Optional) * `property-version`: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is `1`. (Optional) * `cache-enabled`: Whether to enable catalog cache, default value is `true`. (Optional) diff --git a/docs/docs/hive.md b/docs/docs/hive.md index 1d982c169b84..15e564f3d32a 100644 --- a/docs/docs/hive.md +++ b/docs/docs/hive.md @@ -189,7 +189,7 @@ SET iceberg.catalog.hadoop.warehouse=hdfs://example.com:8020/warehouse; Register an AWS `GlueCatalog` called `glue`: ``` -SET iceberg.catalog.glue.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog; +SET iceberg.catalog.glue.type=glue; SET iceberg.catalog.glue.warehouse=s3://my-bucket/my/key/prefix; SET iceberg.catalog.glue.lock.table=myGlueLockTable; ``` diff --git a/docs/docs/jdbc.md b/docs/docs/jdbc.md index e4dd38a9a1dc..118b5be5095a 100644 --- a/docs/docs/jdbc.md +++ b/docs/docs/jdbc.md @@ -47,7 +47,7 @@ You can start a Spark session with a MySQL JDBC connection using the following c spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{{ icebergVersion }} \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \ - --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \ + --conf spark.sql.catalog.my_catalog.type=jdbc \ --conf spark.sql.catalog.my_catalog.uri=jdbc:mysql://test.1234567890.us-west-2.rds.amazonaws.com:3306/default \ --conf spark.sql.catalog.my_catalog.jdbc.verifyServerCertificate=true \ --conf spark.sql.catalog.my_catalog.jdbc.useSSL=true \ diff --git a/docs/docs/nessie.md b/docs/docs/nessie.md index 9d822fe6d0aa..96d83095388b 100644 --- a/docs/docs/nessie.md +++ b/docs/docs/nessie.md @@ -76,7 +76,7 @@ and in Spark: conf.set("spark.sql.catalog.nessie.warehouse", "/path/to/warehouse"); conf.set("spark.sql.catalog.nessie.uri", "http://localhost:19120/api/v2") conf.set("spark.sql.catalog.nessie.ref", "main") -conf.set("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog") +conf.set("spark.sql.catalog.nessie.type", "nessie") conf.set("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog") conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions") ``` @@ -93,14 +93,14 @@ table_env = StreamTableEnvironment.create(env) table_env.execute_sql("CREATE CATALOG nessie_catalog WITH (" "'type'='iceberg', " - "'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog', " + "'type'='nessie', " "'uri'='http://localhost:19120/api/v2', " "'ref'='main', " "'warehouse'='/path/to/warehouse')") ``` There is nothing special above about the `nessie` name. A spark catalog can have any name, the important parts are the -settings for the `catalog-impl` and the required config to start Nessie correctly. +settings for the `type` or `catalog-impl` and the required config to start Nessie correctly. Once you have a Nessie catalog you have access to your entire Nessie repo. You can then perform create/delete/merge operations on branches and perform commits on branches. Each Iceberg table in a Nessie Catalog is identified by an arbitrary length namespace and table name (eg `data.base.name.table`). These namespaces must be explicitly created diff --git a/docs/docs/spark-configuration.md b/docs/docs/spark-configuration.md index d3899b8ab676..9567f144809c 100644 --- a/docs/docs/spark-configuration.md +++ b/docs/docs/spark-configuration.md @@ -64,9 +64,9 @@ Iceberg supplies two implementations: Both catalogs are configured using properties nested under the catalog name. Common configuration properties for Hive and Hadoop are: -| Property | Values | Description | -| -------------------------------------------------- | ----------------------------- | -------------------------------------------------------------------- | -| spark.sql.catalog._catalog-name_.type | `hive`, `hadoop` or `rest` | The underlying Iceberg catalog implementation, `HiveCatalog`, `HadoopCatalog`, `RESTCatalog` or left unset if using a custom catalog | +| Property | Values | Description | +| -------------------------------------------------- |----------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| spark.sql.catalog._catalog-name_.type | `hive`, `hadoop`, `rest`, `glue`, `jdbc` or `nessie` | The underlying Iceberg catalog implementation, `HiveCatalog`, `HadoopCatalog`, `RESTCatalog`, `GlueCatalog`, `JdbcCatalog`, `NessieCatalog` or left unset if using a custom catalog | | spark.sql.catalog._catalog-name_.catalog-impl | | The custom Iceberg catalog implementation. If `type` is null, `catalog-impl` must not be null. | | spark.sql.catalog._catalog-name_.io-impl | | The custom FileIO implementation. | | spark.sql.catalog._catalog-name_.metrics-reporter-impl | | The custom MetricsReporter implementation. | diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieCatalog.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieCatalog.java index fa381194ba0c..107106f0835b 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieCatalog.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieCatalog.java @@ -21,8 +21,10 @@ import java.io.IOException; import java.net.URI; import java.nio.file.Path; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.CatalogTests; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.util.LocationUtil; @@ -110,18 +112,17 @@ private void resetData() throws NessieConflictException, NessieNotFoundException } private NessieCatalog initNessieCatalog(String ref) { - NessieCatalog newCatalog = new NessieCatalog(); - newCatalog.setConf(hadoopConfig); - ImmutableMap options = + Map options = ImmutableMap.of( + "type", + "nessie", "ref", ref, CatalogProperties.URI, uri, CatalogProperties.WAREHOUSE_LOCATION, temp.toUri().toString()); - newCatalog.initialize("nessie", options); - return newCatalog; + return (NessieCatalog) CatalogUtil.buildIcebergCatalog("nessie", options, hadoopConfig); } @Override