Skip to content

Commit

Permalink
Core: Add catalog type for glue,jdbc,nessie (apache#9647)
Browse files Browse the repository at this point in the history
Co-authored-by: zhaomin <zhaomin1423@163.com>
  • Loading branch information
2 people authored and devangjhabakh committed Apr 22, 2024
1 parent afc06e0 commit 5f87ee8
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 30 deletions.
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,16 @@ public class CatalogUtil {
public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
public static final String ICEBERG_CATALOG_TYPE_REST = "rest";
public static final String ICEBERG_CATALOG_TYPE_GLUE = "glue";
public static final String ICEBERG_CATALOG_TYPE_NESSIE = "nessie";
public static final String ICEBERG_CATALOG_TYPE_JDBC = "jdbc";

public static final String ICEBERG_CATALOG_HADOOP = "org.apache.iceberg.hadoop.HadoopCatalog";
public static final String ICEBERG_CATALOG_HIVE = "org.apache.iceberg.hive.HiveCatalog";
public static final String ICEBERG_CATALOG_REST = "org.apache.iceberg.rest.RESTCatalog";
public static final String ICEBERG_CATALOG_GLUE = "org.apache.iceberg.aws.glue.GlueCatalog";
public static final String ICEBERG_CATALOG_NESSIE = "org.apache.iceberg.nessie.NessieCatalog";
public static final String ICEBERG_CATALOG_JDBC = "org.apache.iceberg.jdbc.JdbcCatalog";

private CatalogUtil() {}

Expand Down Expand Up @@ -278,6 +284,15 @@ public static Catalog buildIcebergCatalog(String name, Map<String, String> optio
case ICEBERG_CATALOG_TYPE_REST:
catalogImpl = ICEBERG_CATALOG_REST;
break;
case ICEBERG_CATALOG_TYPE_GLUE:
catalogImpl = ICEBERG_CATALOG_GLUE;
break;
case ICEBERG_CATALOG_TYPE_NESSIE:
catalogImpl = ICEBERG_CATALOG_NESSIE;
break;
case ICEBERG_CATALOG_TYPE_JDBC:
catalogImpl = ICEBERG_CATALOG_JDBC;
break;
default:
throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -138,12 +139,10 @@ private JdbcCatalog initCatalog(String catalogName, Map<String, String> props) {
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
warehouseLocation = this.tableDir.toAbsolutePath().toString();
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
properties.put("type", "jdbc");
properties.putAll(props);

JdbcCatalog jdbcCatalog = new JdbcCatalog();
jdbcCatalog.setConf(conf);
jdbcCatalog.initialize(catalogName, properties);
return jdbcCatalog;
return (JdbcCatalog) CatalogUtil.buildIcebergCatalog(catalogName, properties, conf);
}

@Test
Expand Down
23 changes: 12 additions & 11 deletions docs/docs/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{{ iceber
--conf spark.sql.defaultCatalog=my_catalog \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
```

Expand Down Expand Up @@ -84,7 +84,7 @@ With those dependencies, you can create a Flink catalog like the following:
CREATE CATALOG my_catalog WITH (
'type'='iceberg',
'warehouse'='s3://my-bucket/my/key/prefix',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'type'='glue',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
);
```
Expand Down Expand Up @@ -115,7 +115,7 @@ With those dependencies, you can register a Glue catalog and create external tab
```sql
SET iceberg.engine.hive.enabled=true;
SET hive.vectorized.execution.enabled=false;
SET iceberg.catalog.glue.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog;
SET iceberg.catalog.glue.type=glue;
SET iceberg.catalog.glue.warehouse=s3://my-bucket/my/key/prefix;

-- suppose you have an Iceberg table database_a.table_a created by GlueCatalog
Expand All @@ -136,7 +136,8 @@ Iceberg enables the use of [AWS Glue](https://aws.amazon.com/glue) as the `Catal
When used, an Iceberg namespace is stored as a [Glue Database](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html),
an Iceberg table is stored as a [Glue Table](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-tables.html),
and every Iceberg table version is stored as a [Glue TableVersion](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-tables.html#aws-glue-api-catalog-tables-TableVersion).
You can start using Glue catalog by specifying the `catalog-impl` as `org.apache.iceberg.aws.glue.GlueCatalog`,
You can start using Glue catalog by specifying the `catalog-impl` as `org.apache.iceberg.aws.glue.GlueCatalog`
or by setting `type` as `glue`,
just like what is shown in the [enabling AWS integration](#enabling-aws-integration) section above.
More details about loading the catalog can be found in individual engine pages, such as [Spark](spark-configuration.md#loading-a-custom-catalog) and [Flink](flink.md#creating-catalogs-and-using-catalogs).

Expand Down Expand Up @@ -410,7 +411,7 @@ For example, to write S3 tags with Spark 3.3, you can start the Spark SQL shell
```
spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.write.tags.my_key1=my_val1 \
--conf spark.sql.catalog.my_catalog.s3.write.tags.my_key2=my_val2
Expand All @@ -428,7 +429,7 @@ For example, to add S3 delete tags with Spark 3.3, you can start the Spark SQL s
```
sh spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg-warehouse/s3-tagging \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.delete.tags.my_key3=my_val3 \
--conf spark.sql.catalog.my_catalog.s3.delete-enabled=false
Expand All @@ -443,7 +444,7 @@ For example, to write table and namespace name as S3 tags with Spark 3.3, you ca
```
sh spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg-warehouse/s3-tagging \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.write.table-tag-enabled=true \
--conf spark.sql.catalog.my_catalog.s3.write.namespace-tag-enabled=true
Expand All @@ -463,7 +464,7 @@ For example, to use S3 access-point with Spark 3.3, you can start the Spark SQL
```
spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.use-arn-region-enabled=false \
--conf spark.sql.catalog.test.s3.access-points.my-bucket1=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap \
Expand All @@ -484,7 +485,7 @@ For example, to use S3 Acceleration with Spark 3.3, you can start the Spark SQL
```
spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.acceleration-enabled=true
```
Expand All @@ -502,7 +503,7 @@ For example, to use S3 Dual-stack with Spark 3.3, you can start the Spark SQL sh
```
spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.dualstack-enabled=true
```
Expand Down Expand Up @@ -538,7 +539,7 @@ Here is an example to start Spark shell with this client factory:
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{{ icebergVersion }},org.apache.iceberg:iceberg-aws-bundle:{{ icebergVersion }} \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.catalog.my_catalog.client.factory=org.apache.iceberg.aws.AssumeRoleAwsClientFactory \
--conf spark.sql.catalog.my_catalog.client.assume-role.arn=arn:aws:iam::123456789:role/myRoleToAssume \
--conf spark.sql.catalog.my_catalog.client.assume-role.region=ap-northeast-1
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/flink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ The following properties can be set globally and are not limited to a specific c
| Property | Required | Values | Description |
| ---------------------------- |----------| -------------------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| type | ✔️ | iceberg | Must be `iceberg`. |
| catalog-type | | `hive`, `hadoop` or `rest` | `hive`, `hadoop` or `rest` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. |
| catalog-type | | `hive`, `hadoop`, `rest`, `glue`, `jdbc` or `nessie` | The underlying Iceberg catalog implementation, `HiveCatalog`, `HadoopCatalog`, `RESTCatalog`, `GlueCatalog`, `JdbcCatalog`, `NessieCatalog` or left unset if using a custom catalog implementation via catalog-impl|
| catalog-impl | | | The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. |
| property-version | | | Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is `1`. |
| cache-enabled | | `true` or `false` | Whether to enable catalog cache, default value is `true`. |
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ CREATE CATALOG <catalog_name> WITH (
The following properties can be set globally and are not limited to a specific catalog implementation:

* `type`: Must be `iceberg`. (required)
* `catalog-type`: `hive`, `hadoop` or `rest` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. (Optional)
* `catalog-type`: `hive`, `hadoop`, `rest`, `glue`, `jdbc` or `nessie` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. (Optional)
* `catalog-impl`: The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. (Optional)
* `property-version`: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is `1`. (Optional)
* `cache-enabled`: Whether to enable catalog cache, default value is `true`. (Optional)
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ SET iceberg.catalog.hadoop.warehouse=hdfs://example.com:8020/warehouse;
Register an AWS `GlueCatalog` called `glue`:

```
SET iceberg.catalog.glue.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog;
SET iceberg.catalog.glue.type=glue;
SET iceberg.catalog.glue.warehouse=s3://my-bucket/my/key/prefix;
SET iceberg.catalog.glue.lock.table=myGlueLockTable;
```
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ You can start a Spark session with a MySQL JDBC connection using the following c
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{{ icebergVersion }} \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \
--conf spark.sql.catalog.my_catalog.type=jdbc \
--conf spark.sql.catalog.my_catalog.uri=jdbc:mysql://test.1234567890.us-west-2.rds.amazonaws.com:3306/default \
--conf spark.sql.catalog.my_catalog.jdbc.verifyServerCertificate=true \
--conf spark.sql.catalog.my_catalog.jdbc.useSSL=true \
Expand Down
6 changes: 3 additions & 3 deletions docs/docs/nessie.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ and in Spark:
conf.set("spark.sql.catalog.nessie.warehouse", "/path/to/warehouse");
conf.set("spark.sql.catalog.nessie.uri", "http://localhost:19120/api/v2")
conf.set("spark.sql.catalog.nessie.ref", "main")
conf.set("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
conf.set("spark.sql.catalog.nessie.type", "nessie")
conf.set("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
```
Expand All @@ -93,14 +93,14 @@ table_env = StreamTableEnvironment.create(env)

table_env.execute_sql("CREATE CATALOG nessie_catalog WITH ("
"'type'='iceberg', "
"'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog', "
"'type'='nessie', "
"'uri'='http://localhost:19120/api/v2', "
"'ref'='main', "
"'warehouse'='/path/to/warehouse')")
```

There is nothing special above about the `nessie` name. A spark catalog can have any name, the important parts are the
settings for the `catalog-impl` and the required config to start Nessie correctly.
settings for the `type` or `catalog-impl` and the required config to start Nessie correctly.
Once you have a Nessie catalog you have access to your entire Nessie repo. You can then perform create/delete/merge
operations on branches and perform commits on branches. Each Iceberg table in a Nessie Catalog is identified by an
arbitrary length namespace and table name (eg `data.base.name.table`). These namespaces must be explicitly created
Expand Down
6 changes: 3 additions & 3 deletions docs/docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ Iceberg supplies two implementations:

Both catalogs are configured using properties nested under the catalog name. Common configuration properties for Hive and Hadoop are:

| Property | Values | Description |
| -------------------------------------------------- | ----------------------------- | -------------------------------------------------------------------- |
| spark.sql.catalog._catalog-name_.type | `hive`, `hadoop` or `rest` | The underlying Iceberg catalog implementation, `HiveCatalog`, `HadoopCatalog`, `RESTCatalog` or left unset if using a custom catalog |
| Property | Values | Description |
| -------------------------------------------------- |----------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| spark.sql.catalog._catalog-name_.type | `hive`, `hadoop`, `rest`, `glue`, `jdbc` or `nessie` | The underlying Iceberg catalog implementation, `HiveCatalog`, `HadoopCatalog`, `RESTCatalog`, `GlueCatalog`, `JdbcCatalog`, `NessieCatalog` or left unset if using a custom catalog |
| spark.sql.catalog._catalog-name_.catalog-impl | | The custom Iceberg catalog implementation. If `type` is null, `catalog-impl` must not be null. |
| spark.sql.catalog._catalog-name_.io-impl | | The custom FileIO implementation. |
| spark.sql.catalog._catalog-name_.metrics-reporter-impl | | The custom MetricsReporter implementation. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.LocationUtil;
Expand Down Expand Up @@ -110,18 +112,17 @@ private void resetData() throws NessieConflictException, NessieNotFoundException
}

private NessieCatalog initNessieCatalog(String ref) {
NessieCatalog newCatalog = new NessieCatalog();
newCatalog.setConf(hadoopConfig);
ImmutableMap<String, String> options =
Map<String, String> options =
ImmutableMap.of(
"type",
"nessie",
"ref",
ref,
CatalogProperties.URI,
uri,
CatalogProperties.WAREHOUSE_LOCATION,
temp.toUri().toString());
newCatalog.initialize("nessie", options);
return newCatalog;
return (NessieCatalog) CatalogUtil.buildIcebergCatalog("nessie", options, hadoopConfig);
}

@Override
Expand Down

0 comments on commit 5f87ee8

Please sign in to comment.