Skip to content

Commit

Permalink
feat: load, write supports iceberg (#3737)
Browse files Browse the repository at this point in the history
* feat: load/write supports iceberg

* docs
  • Loading branch information
vagetablechicken committed Feb 8, 2024
1 parent 5924321 commit b6f7f2f
Show file tree
Hide file tree
Showing 14 changed files with 499 additions and 206 deletions.
4 changes: 2 additions & 2 deletions docs/en/integration/offline_data_sources/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Importing data from Hive sources is facilitated through the API [`LOAD DATA INFI

- Both offline and online engines are capable of importing data from Hive sources.
- The Hive data import feature supports soft connections. This approach minimizes the need for redundant data copies and ensures that OpenMLDB can access Hive's most up-to-date data at any given time. To activate the soft link mechanism for data import, utilize the `deep_copy=false` parameter.
- The `OPTIONS` parameter offers two valid settings: `deep_copy`, `mode` and `sql`.
- The `OPTIONS` parameter offers three valid settings: `deep_copy`, `mode` and `sql`.

For example:

Expand All @@ -122,7 +122,7 @@ LOAD DATA INFILE 'hive://db1.t1' INTO TABLE db1.t1 OPTIONS(deep_copy=true, sql='

Exporting data to Hive sources is facilitated through the API [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md), which employs a distinct URI format, `hive://[db].table`, to seamlessly transfer data to the Hive data warehouse. Here are some key considerations:

- If you omit specifying a database name, the default database name used will be `default_Db`.
- If you omit specifying Hive database name, the default database used in Hive will be `default`.
- When a database name is explicitly provided, it's imperative that the database already exists. Currently, the system does not support the automatic creation of non-existent databases.
- In the event that the designated Hive table name is absent, the system will automatically generate a table with the corresponding name within the Hive environment.
- The `OPTIONS` parameter exclusively takes effect within the export mode of `mode`. Other parameters do not exert any influence.
Expand Down
104 changes: 104 additions & 0 deletions docs/en/integration/offline_data_sources/iceberg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Iceberg

## Introduction

[Apache Iceberg](https://iceberg.apache.org/) is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, Hive and Impala using a high-performance table format that works just like a SQL table. OpenMLDB supports the use of Iceberg as an offline storage engine for importing data and exporting feature computation data.

## Configuration

### Installation

For users employing [The OpenMLDB Spark Distribution Version](../../tutorial/openmldbspark_distribution.md), specifically v0.8.5 and newer iterations, the essential Iceberg 1.4.3 dependencies are already integrated. If you are working with an alternative Spark distribution or different iceberg version, you can download the corresponding Iceberg dependencies from the [Iceberg release](https://iceberg.apache.org/releases/) and add them to the Spark classpath/jars. For example, if you are using OpenMLDB Spark, you should download `x.x.x Spark 3.2_12 runtime Jar`(x.x.x is iceberg version) and add it to `jars/` in Spark home.

### Configuration

You should add the catalog configuration to the Spark configuration. This can be accomplished in two ways:

- taskmanager.properties(.template): Include iceberg configs within the `spark.default.conf` configuration item, followed by restarting the taskmanager.
- CLI: Integrate this configuration directive into ini conf and use `--spark_conf` when start CLI. Please refer to [Client Spark Configuration](../../reference/client_config/client_spark_config.md).

Iceberg config details can be found in [Iceberg Configuration](https://iceberg.apache.org/docs/latest/spark-configuration/).

For example, set hive catalog in `taskmanager.properties(.template)`:

```properties
spark.default.conf=spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog;spark.sql.catalog.hive_prod.type=hive;spark.sql.catalog.hive_prod.uri=thrift://metastore-host:port
```

If you need to create iceberg tables, you also need to configure `spark.sql.catalog.hive_prod.warehouse`.

Set hadoop catalog:

```properties
spark.default.conf=spark.sql.catalog.hadoop_prod=org.apache.iceberg.hadoop.HadoopCatalog;spark.sql.catalog.hadoop_prod.type=hadoop;spark.sql.catalog.hadoop_prod.warehouse=hdfs://hadoop-namenode:port/warehouse
```

Set rest catalog:

```properties
spark.default.conf=spark.sql.catalog.rest_prod=org.apache.iceberg.spark.SparkCatalog;spark.sql.catalog.rest_prod.catalog-impl=org.apache.iceberg.rest.RESTCatalog;spark.sql.catalog.rest_prod.uri=http://iceberg-rest:8181/
```

The full configuration of the iceberg catalog see [Iceberg Catalog Configuration](https://iceberg.apache.org/docs/latest/spark-configuration/).

### Debug Information

When you import data from Iceberg, you can check the task log to confirm whether the task read the source data.
```
INFO ReaderImpl: Reading ORC rows from
```
TODO

## Data Format

Iceberg schema see [Iceberg Schema](https://iceberg.apache.org/spec/#schema). Currently, it only supports the following Iceberg data format:

| OpenMLDB Data Format | Iceberg Data Format |
| -------------------- | ------------------- |
| BOOL | bool |
| INT | int |
| BIGINT | long |
| FLOAT | float |
| DOUBLE | double |
| DATE | date |
| TIMESTAMP | timestamp |
| STRING | string |

## Import Iceberg Data to OpenMLDB

Importing data from Iceberg sources is facilitated through the API [`LOAD DATA INFILE`](../../openmldb_sql/dml/LOAD_DATA_STATEMENT.md). This operation employs a specialized URI format, `hive://[db].table`, to seamlessly import data from Iceberg. Here are some important considerations:

- Both offline and online engines are capable of importing data from Iceberg sources.
- The Iceberg data import feature supports soft connections. This approach minimizes the need for redundant data copies and ensures that OpenMLDB can access Iceberg's most up-to-date data at any given time. To activate the soft link mechanism for data import, utilize the `deep_copy=false` parameter.
- The `OPTIONS` parameter offers three valid settings: `deep_copy`, `mode` and `sql`.

For example, load data from Iceberg configured as hive catalog:

```sql
LOAD DATA INFILE 'iceberg://hive_prod.db1.t1' INTO TABLE t1 OPTIONS(deep_copy=false);
-- or
LOAD DATA INFILE 'hive_prod.db1.t1' INTO TABLE t1 OPTIONS(deep_copy=false, format='iceberg');
```

The data loading process also supports using SQL queries to filter specific data from Hive tables. It's important to note that the SQL syntax must comply with SparkSQL standards. The table name used should be the registered name without the `iceberg://` prefix.

For example:

```sql
LOAD DATA INFILE 'iceberg://hive_prod.db1.t1' INTO TABLE db1.t1 OPTIONS(deep_copy=true, sql='SELECT * FROM hive_prod.db1.t1 where key=\"foo\"')
```

## Export OpenMLDB Data to Iceberg

Exporting data to Iceberg sources is facilitated through the API [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md), which employs a distinct URI format, `iceberg://[catalog].[db].table`, to seamlessly transfer data to the Iceberg data warehouse. Here are some key considerations:

- If you omit specifying Iceberg database name, the default database used in Iceberg will be `default`.
- When Iceberg database name is explicitly provided, it's imperative that the database already exists. Currently, the system does not support the automatic creation of non-existent databases.
- In the event that the designated Iceberg table name is absent, the system will automatically generate a table with the corresponding name within the Hive environment.
- The `OPTIONS` parameter exclusively takes effect within the export mode of `mode`. Other parameters do not exert any influence.

For example:

```sql
SELECT col1, col2, col3 FROM t1 INTO OUTFILE 'iceberg://hive_prod.db1.t1';
```
3 changes: 2 additions & 1 deletion docs/en/integration/offline_data_sources/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ Offline Data Source
:maxdepth: 1

hive
s3
s3
iceberg
8 changes: 4 additions & 4 deletions docs/zh/integration/offline_data_sources/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@

### 配置

目前 OpenMLDB 只支持使用 metastore 服务来连接Hive。你可以在以下两种配置方式中选择一种,来访问 Hive 数据源。
目前 OpenMLDB 只支持使用 metastore 服务来连接Hive。你可以在以下两种配置方式中选择一种,来访问 Hive 数据源。测试搭建的HIVE环境简单,通常只需要配置`hive.metastore.uris`即可。但生产环境中,可能需要配置更多的Hive配置,更推荐使用`hive-site.xml`的方式。

- spark.conf:你可以在 spark conf 中配置 `spark.hadoop.hive.metastore.uris`。有两种方式:
- spark.conf:你可以在 spark conf 中配置 `spark.hadoop.hive.metastore.uris`等相关配置。有两种方式:

- taskmanager.properties: 在配置项 `spark.default.conf` 中加入`spark.hadoop.hive.metastore.uris=thrift://...` ,随后重启taskmanager。
- CLI: 在 ini conf 中加入此配置项,并使用`--spark_conf`启动CLI,参考[客户端Spark配置文件](../../reference/client_config/client_spark_config.md)

- hive-site.xml:你可以配置 `hive-site.xml` 中的 `hive.metastore.uris`,并将配置文件放入 Spark home的`conf/`(如果已配置`HADOOP_CONF_DIR`环境变量,也可以将配置文件放入`HADOOP_CONF_DIR`中)。`hive-site.xml` 样例:
- hive-site.xml:你可以将HIVE的配置 `hive-site.xml` 放入 Spark home的`conf/`(如果已配置`HADOOP_CONF_DIR`环境变量,也可以将配置文件放入`HADOOP_CONF_DIR`中)。`hive-site.xml` 样例:

```xml
<configuration>
Expand Down Expand Up @@ -122,7 +122,7 @@ LOAD DATA INFILE 'hive://db1.t1' INTO TABLE db1.t1 OPTIONS(deep_copy=true, sql='

对于 Hive 数据源的导出是通过 API [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md) 进行支持,通过使用特定的 URI 接口 `hive://[db].table` 的格式进行导出到 Hive 数仓。注意:

- 如果不指定数据库名字,则会使用默认数据库名字 `default_db`
- 如果不指定Hive数据库名字,则会使用Hive默认数据库 `default`
- 如果指定数据库名字,则该数据库必须已经存在,目前不支持对于不存在的数据库进行自动创建
- 如果指定的Hive表名不存在,则会在 Hive 内自动创建对应名字的表
- `OPTIONS` 参数只有导出模式`mode`生效,其他参数均不生效
Expand Down
132 changes: 132 additions & 0 deletions docs/zh/integration/offline_data_sources/iceberg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Iceberg

## 简介

[Apache Iceberg](https://iceberg.apache.org/) 是一个开源的大数据表格格式。Iceberg可以在Spark、Trino、PrestoDB、Flink、Hive和Impala等计算引擎中添加表格,使用高性能的表格格式,就像SQL表格一样。OpenMLDB 支持使用 Iceberg 作为离线存储引擎,导入数据和导出特征计算数据。

## 配置

### 安装

[OpenMLDB Spark 发行版](../../tutorial/openmldbspark_distribution.md) v0.8.5 及以上版本均已经包含 Iceberg 1.4.3 依赖。如果你需要与其他iceberg版本或者其他Spark发行版一起使用,你可以从[Iceberg release](https://iceberg.apache.org/releases/)下载对应的Iceberg依赖,并将其添加到Spark的classpath/jars中。例如,如果你使用的是OpenMLDB Spark,你应该下载`x.x.x Spark 3.2_12 runtime Jar`(x.x.x is iceberg version)并将其添加到Spark home的`jars/`中。

### 配置

你需要将catalog配置添加到Spark配置中。有两种方式:

- taskmanager.properties(.template): 在配置项 `spark.default.conf` 中加入Iceberg配置,随后重启taskmanager。
- CLI: 在 ini conf 中加入此配置项,并使用`--spark_conf`启动CLI,参考[客户端Spark配置文件](../../reference/client_config/client_spark_config.md)

Iceberg配置详情参考[Iceberg Configuration](https://iceberg.apache.org/docs/latest/spark-configuration/)

例如,在`taskmanager.properties(.template)`中设置hive catalog:

```properties
spark.default.conf=spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog;spark.sql.catalog.hive_prod.type=hive;spark.sql.catalog.hive_prod.uri=thrift://metastore-host:port
```

如果需要创建iceberg表,还需要配置`spark.sql.catalog.hive_prod.warehouse`

设置 hadoop catalog:

```properties
spark.default.conf=spark.sql.catalog.hadoop_prod=org.apache.iceberg.hadoop.HadoopCatalog;spark.sql.catalog.hadoop_prod.type=hadoop;spark.sql.catalog.hadoop_prod.warehouse=hdfs://hadoop-namenode:port/warehouse
```

设置 rest catalog:

```properties
spark.default.conf=spark.sql.catalog.rest_prod=org.apache.iceberg.spark.SparkCatalog;spark.sql.catalog.rest_prod.catalog-impl=org.apache.iceberg.rest.RESTCatalog;spark.sql.catalog.rest_prod.uri=http://iceberg-rest:8181/
```

Iceberg catalog的完整配置参考[Iceberg Catalog Configuration](https://iceberg.apache.org/docs/latest/spark-configuration/)

任一配置成功后,均使用`<catalog_name>.<db_name>.<table_name>`的格式访问Iceberg表。如果不想使用`<catalog_name>`,可以在配置中设置`spark.sql.catalog.default=<catalog_name>`。也可添加`spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog``spark.sql.catalog.spark_catalog.type=hive`,让iceberg catalog合入spark catalog中(非iceberg表仍然存在于spark catalog中),这样可以使用`<db_name>.<table_name>`的格式访问Iceberg表。

### 调试信息

成功连接Iceberg Hive Catalog后,你可以在日志中看到类似以下的信息:

```
24/01/30 09:01:05 INFO SharedState: Setting hive.metastore.warehouse.dir ('hdfs://namenode:19000/user/hive/warehouse') to the value of spark.sql.warehouse.dir.
24/01/30 09:01:05 INFO SharedState: Warehouse path is 'hdfs://namenode:19000/user/hive/warehouse'.
...
24/01/30 09:01:06 INFO HiveUtils: Initializing HiveMetastoreConnection version 2.3.9 using Spark classes.
24/01/30 09:01:06 INFO HiveClientImpl: Warehouse location for Hive client (version 2.3.9) is hdfs://namenode:19000/user/hive/warehouse
24/01/30 09:01:06 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/01/30 09:01:06 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/01/30 09:01:06 INFO HiveMetaStore: 0: Opening raw store with implementation class:org.apache.hadoop.hive.metastore.ObjectStore
24/01/30 09:01:06 INFO ObjectStore: ObjectStore, initialize called
24/01/30 09:01:06 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
24/01/30 09:01:06 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
24/01/30 09:01:07 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
24/01/30 09:01:07 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is POSTGRES
24/01/30 09:01:07 INFO ObjectStore: Initialized ObjectStore
24/01/30 09:01:08 INFO HiveMetaStore: Added admin role in metastore
24/01/30 09:01:08 INFO HiveMetaStore: Added public role in metastore
24/01/30 09:01:08 INFO HiveMetaStore: No user is added in admin role, since config is empty
24/01/30 09:01:08 INFO HiveMetaStore: 0: get_database: default
```

导出到Iceberg时,你可以检查任务日志,应该有类似以下的信息:

```
24/01/30 09:57:29 INFO AtomicReplaceTableAsSelectExec: Start processing data source write support: IcebergBatchWrite(table=nyc.taxis_out, format=PARQUET). The input RDD has 1 partitions.
...
24/01/30 09:57:31 INFO AtomicReplaceTableAsSelectExec: Data source write support IcebergBatchWrite(table=nyc.taxis_out, format=PARQUET) committed.
...
24/01/30 09:57:31 INFO HiveTableOperations: Committed to table hive_prod.nyc.taxis_out with the new metadata location hdfs://namenode:19000/user/hive/iceberg_storage/nyc.db/taxis_out/metadata/00001-038d8b81-04a6-4a19-bb83-275eb4664937.metadata.json
24/01/30 09:57:31 INFO BaseMetastoreTableOperations: Successfully committed to table hive_prod.nyc.taxis_out in 224 ms
```

## 数据格式

Iceberg schema参考[Iceberg Schema](https://iceberg.apache.org/spec/#schema)。目前,仅支持以下Iceberg数据格式:

| OpenMLDB 数据格式 | Iceberg 数据格式 |
| ----------------- | ---------------- |
| BOOL | bool |
| INT | int |
| BIGINT | long |
| FLOAT | float |
| DOUBLE | double |
| DATE | date |
| TIMESTAMP | timestamp |
| STRING | string |

## 导入 Iceberg 数据到 OpenMLDB

从 Iceberg 表导入数据,需要使用 [`LOAD DATA INFILE`](../../openmldb_sql/dml/LOAD_DATA_STATEMENT.md) 语句。这个语句使用特殊的 URI 格式 `hive://[db].table`,可以无缝地从 Iceberg 导入数据。以下是一些重要的注意事项:

- 离线引擎和在线引擎都可以从 Iceberg 表导入数据。
- 离线导入支持软链接,但是在线导入不支持软链接。使用软链接时,需要在导入OPTIONS中指定 `deep_copy=false`
- Iceberg 表导入只有三个参数有效: `deep_copy`, `mode` and `sql`。其他格式参数`delimiter``quote`等均无效。

例如,通过Iceberg Hive Catalog导入数据:

```sql
LOAD DATA INFILE 'iceberg://hive_prod.db1.t1' INTO TABLE t1 OPTIONS(deep_copy=false);
-- or
LOAD DATA INFILE 'hive_prod.db1.t1' INTO TABLE t1 OPTIONS(deep_copy=false, format='iceberg');
```

数据导入支持`sql`参数,筛选出表种的特定数据进行导入,注意 SQL 必须符合 SparkSQL 语法,数据表为注册后的表名,不带 `iceberg://` 前缀。

```sql
LOAD DATA INFILE 'iceberg://hive_prod.db1.t1' INTO TABLE t1 OPTIONS(deep_copy=false, sql='select * from t1 where id > 100');
```

## 导出 OpenMLDB 数据到 Iceberg

从 OpenMLDB 导出数据到 Iceberg 表,需要使用 [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md) 语句,这个语句使用特殊的 URI 格式 `iceberg://[db].table`,可以无缝地导出数据到 Iceberg 表。以下是一些重要的注意事项:

- 如果不指定Iceberg数据库名字,则会使用Iceberg默认数据库`default`
- 如果指定Iceberg数据库名字,则该数据库必须已经存在,目前不支持对于不存在的数据库进行自动创建
- 如果指定的Iceberg表名不存在,则会在 Iceberg 内自动创建对应名字的表
- `OPTIONS` 参数只有导出模式`mode`生效,其他参数均不生效

举例:

```sql
SELECT col1, col2, col3 FROM t1 INTO OUTFILE 'iceberg://hive_prod.db1.t1';
```
3 changes: 2 additions & 1 deletion docs/zh/integration/offline_data_sources/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
:maxdepth: 1

hive
s3
s3
iceberg
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com._4paradigm.openmldb.batch.api

import com._4paradigm.openmldb.batch.catalog.OpenmldbCatalogService
import com._4paradigm.openmldb.batch.utils.{DataTypeUtil, VersionCli}
import com._4paradigm.openmldb.batch.utils.HybridseUtil.autoLoad
import com._4paradigm.openmldb.batch.utils.DataSourceUtil.autoLoad
import com._4paradigm.openmldb.batch.{OpenmldbBatchConfig, SparkPlanner}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.log4j.{Level, Logger}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com._4paradigm.openmldb.batch.nodes
import com._4paradigm.hybridse.node.CreateTableLikeClause.LikeKind
import com._4paradigm.hybridse.sdk.UnsupportedHybridSeException
import com._4paradigm.hybridse.vm.PhysicalCreateTableNode
import com._4paradigm.openmldb.batch.utils.{HybridseUtil, OpenmldbTableUtil}
import com._4paradigm.openmldb.batch.utils.{DataSourceUtil, OpenmldbTableUtil}
import com._4paradigm.openmldb.batch.{PlanContext, SparkInstance}
import org.slf4j.LoggerFactory

Expand All @@ -44,7 +44,7 @@ object CreateTablePlan {
val df = likeKind match {
case LikeKind.HIVE =>
val hivePath = node.getData_.GetLikePath()
HybridseUtil.autoLoad(ctx.getOpenmldbSession, hivePath, "hive", Map[String, String](), null)
DataSourceUtil.autoLoad(ctx.getOpenmldbSession, hivePath, "hive", Map[String, String](), null)
case LikeKind.PARQUET =>
val parquetPath = node.getData_.GetLikePath()
ctx.getSparkSession.read.parquet(parquetPath)
Expand Down
Loading

0 comments on commit b6f7f2f

Please sign in to comment.