Skip to content

Commit eaedc0a

Browse files
authored
[Connector-V2]Add Hudi Source (#2147)
* [Connector-V2]Add Hudi Source * fix dependency licenses * add hudi souce v2 doc * fix doc * fix doc links checked
1 parent 509add5 commit eaedc0a

File tree

15 files changed

+879
-2
lines changed

15 files changed

+879
-2
lines changed

docs/en/connector-v2/source/Hudi.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Hudi
2+
3+
## Description
4+
5+
Used to read data from Hudi. Currently, only supports hudi cow table and Snapshot Query with Batch Mode.
6+
7+
## Options
8+
9+
| name | type | required | default value |
10+
|--------------------------|---------|----------|---------------|
11+
| table.path | string | yes | - |
12+
| table.type | string | yes | - |
13+
| conf.files | string | yes | - |
14+
| use.kerberos | boolean | no | false |
15+
| kerberos.principal | string | no | - |
16+
| kerberos.principal.file | string | no | - |
17+
18+
### table.path [string]
19+
20+
`table.path` The hdfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'.
21+
22+
### table.type [string]
23+
24+
`table.type` The type of hudi table. Now we only support 'cow', 'mor' is not support yet.
25+
26+
### conf.files [string]
27+
28+
`conf.files` The environment conf file path list(local path), which used to init hdfs client to read hudi table file. The example is '/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml'.
29+
30+
### use.kerberos [boolean]
31+
32+
`use.kerberos` Whether to enable Kerberos, default is false.
33+
34+
### kerberos.principal [string]
35+
36+
`kerberos.principal` When use kerberos, we should set kerberos princal such as 'test_user@xxx'.
37+
38+
### kerberos.principal.file [string]
39+
40+
`kerberos.principal.file` When use kerberos, we should set kerberos princal file such as '/home/test/test_user.keytab'.
41+
42+
## Examples
43+
44+
```hocon
45+
source {
46+
47+
Hudi {
48+
table.path = "hdfs://nameserivce/data/hudi/hudi_table/"
49+
table.type = "cow"
50+
conf.files = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
51+
use.kerberos = true
52+
kerberos.principal = "test_user@xxx"
53+
kerberos.principal.file = "/home/test/test_user.keytab"
54+
}
55+
56+
}
57+
```

plugin-mapping.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,4 @@ seatunnel.sink.Jdbc = connector-jdbc
105105
seatunnel.sink.HdfsFile = connector-file-hadoop
106106
seatunnel.sink.LocalFile = connector-file-local
107107
seatunnel.source.Pulsar = connector-pulsar
108+
seatunnel.source.Hudi = connector-hudi

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@
109109
<neo4j.connector.spark.version>4.1.0</neo4j.connector.spark.version>
110110
<iceberg.version>0.13.1</iceberg.version>
111111
<flink.version>1.13.6</flink.version>
112-
<hudi.version>0.10.0</hudi.version>
112+
<hudi.version>0.11.1</hudi.version>
113113
<orc.version>1.5.6</orc.version>
114114
<hive.exec.version>2.3.9</hive.exec.version>
115115
<commons.logging.version>1.2</commons.logging.version>
@@ -499,6 +499,12 @@
499499
<version>${flink.version}</version>
500500
</dependency>
501501

502+
<dependency>
503+
<groupId>org.apache.hudi</groupId>
504+
<artifactId>hudi-hadoop-mr-bundle</artifactId>
505+
<version>${hudi.version}</version>
506+
</dependency>
507+
502508
<dependency>
503509
<groupId>org.apache.hudi</groupId>
504510
<artifactId>hudi-spark-bundle_${scala.binary.version}</artifactId>

seatunnel-connectors-v2-dist/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@
9191
<artifactId>connector-file-local</artifactId>
9292
<version>${project.version}</version>
9393
</dependency>
94+
<dependency>
95+
<groupId>org.apache.seatunnel</groupId>
96+
<artifactId>connector-hudi</artifactId>
97+
<version>${project.version}</version>
98+
</dependency>
9499
</dependencies>
95100

96101
<build>
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<parent>
24+
<artifactId>seatunnel-connectors-v2</artifactId>
25+
<groupId>org.apache.seatunnel</groupId>
26+
<version>${revision}</version>
27+
</parent>
28+
<modelVersion>4.0.0</modelVersion>
29+
30+
<artifactId>connector-hudi</artifactId>
31+
32+
<dependencies>
33+
34+
<dependency>
35+
<groupId>org.apache.seatunnel</groupId>
36+
<artifactId>seatunnel-hive-shade</artifactId>
37+
<version>${project.version}</version>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>org.apache.seatunnel</groupId>
42+
<artifactId>seatunnel-api</artifactId>
43+
<version>${project.version}</version>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.apache.hudi</groupId>
48+
<artifactId>hudi-hadoop-mr-bundle</artifactId>
49+
</dependency>
50+
51+
<dependency>
52+
<groupId>org.apache.commons</groupId>
53+
<artifactId>commons-lang3</artifactId>
54+
</dependency>
55+
56+
<dependency>
57+
<groupId>junit</groupId>
58+
<artifactId>junit</artifactId>
59+
</dependency>
60+
</dependencies>
61+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hudi.config;
19+
20+
public class HudiSourceConfig {
21+
22+
public static final String TABLE_PATH = "table.path";
23+
24+
public static final String TABLE_TYPE = "table.type";
25+
26+
public static final String CONF_FILES = "conf.files";
27+
28+
public static final String USE_KERBEROS = "use.kerberos";
29+
30+
public static final String KERBEROS_PRINCIPAL = "kerberos.principal";
31+
32+
public static final String KERBEROS_PRINCIPAL_FILE = "kerberos.principal.file";
33+
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hudi.exception;
19+
20+
public class HudiPluginException extends Exception{
21+
22+
public HudiPluginException(String message) {
23+
super(message);
24+
}
25+
26+
public HudiPluginException(String message, Throwable cause) {
27+
super(message, cause);
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hudi.source;
19+
20+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.CONF_FILES;
21+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.KERBEROS_PRINCIPAL;
22+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.KERBEROS_PRINCIPAL_FILE;
23+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.TABLE_PATH;
24+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.TABLE_TYPE;
25+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.USE_KERBEROS;
26+
27+
import org.apache.seatunnel.api.common.PrepareFailException;
28+
import org.apache.seatunnel.api.common.SeaTunnelContext;
29+
import org.apache.seatunnel.api.serialization.DefaultSerializer;
30+
import org.apache.seatunnel.api.serialization.Serializer;
31+
import org.apache.seatunnel.api.source.Boundedness;
32+
import org.apache.seatunnel.api.source.SeaTunnelSource;
33+
import org.apache.seatunnel.api.source.SourceReader;
34+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
35+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
36+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
37+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
38+
import org.apache.seatunnel.common.config.CheckConfigUtil;
39+
import org.apache.seatunnel.common.config.CheckResult;
40+
import org.apache.seatunnel.common.constants.PluginType;
41+
import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiPluginException;
42+
import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
43+
44+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
45+
46+
import com.google.auto.service.AutoService;
47+
48+
import java.io.IOException;
49+
50+
@AutoService(SeaTunnelSource.class)
51+
public class HudiSource implements SeaTunnelSource<SeaTunnelRow, HudiSourceSplit, HudiSourceState> {
52+
53+
private SeaTunnelContext seaTunnelContext;
54+
55+
private SeaTunnelRowType typeInfo;
56+
57+
private String filePath;
58+
59+
private String tablePath;
60+
61+
private String confFiles;
62+
63+
private boolean useKerberos = false;
64+
65+
@Override
66+
public String getPluginName() {
67+
return "Hudi";
68+
}
69+
70+
@Override
71+
public void prepare(Config pluginConfig) {
72+
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, TABLE_PATH, CONF_FILES);
73+
if (!result.isSuccess()) {
74+
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
75+
}
76+
// default hudi table tupe is cow
77+
// TODO: support hudi mor table
78+
// TODO: support Incremental Query and Read Optimized Query
79+
if (!"cow".equalsIgnoreCase(pluginConfig.getString(TABLE_TYPE))) {
80+
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Do not support hudi mor table yet!");
81+
}
82+
try {
83+
this.confFiles = pluginConfig.getString(CONF_FILES);
84+
this.tablePath = pluginConfig.getString(TABLE_PATH);
85+
if (CheckConfigUtil.isValidParam(pluginConfig, USE_KERBEROS)) {
86+
this.useKerberos = pluginConfig.getBoolean(USE_KERBEROS);
87+
if (this.useKerberos) {
88+
CheckResult kerberosCheckResult = CheckConfigUtil.checkAllExists(pluginConfig, KERBEROS_PRINCIPAL, KERBEROS_PRINCIPAL_FILE);
89+
if (!kerberosCheckResult.isSuccess()) {
90+
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
91+
}
92+
HudiUtil.initKerberosAuthentication(HudiUtil.getConfiguration(this.confFiles), pluginConfig.getString(KERBEROS_PRINCIPAL), pluginConfig.getString(KERBEROS_PRINCIPAL_FILE));
93+
}
94+
}
95+
this.filePath = HudiUtil.getParquetFileByPath(this.confFiles, tablePath);
96+
if (this.filePath == null) {
97+
throw new HudiPluginException(String.format("%s has no parquet file, please check!", tablePath));
98+
}
99+
// should read from config or read from hudi metadata( wait catlog done)
100+
this.typeInfo = HudiUtil.getSeaTunnelRowTypeInfo(this.confFiles, this.filePath);
101+
102+
} catch (HudiPluginException | IOException e) {
103+
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Prepare HudiSource error.", e);
104+
}
105+
106+
}
107+
108+
@Override
109+
public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
110+
this.seaTunnelContext = seaTunnelContext;
111+
}
112+
113+
@Override
114+
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
115+
return this.typeInfo;
116+
}
117+
118+
@Override
119+
public SourceReader<SeaTunnelRow, HudiSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
120+
return new HudiSourceReader(this.confFiles, readerContext, typeInfo);
121+
}
122+
123+
@Override
124+
public Boundedness getBoundedness() {
125+
// Only support Snapshot Query now.
126+
// After support Incremental Query and Read Optimized Query, we should supoort UNBOUNDED.
127+
// TODO: support UNBOUNDED
128+
return Boundedness.BOUNDED;
129+
}
130+
131+
@Override
132+
public SourceSplitEnumerator<HudiSourceSplit, HudiSourceState> createEnumerator(SourceSplitEnumerator.Context<HudiSourceSplit> enumeratorContext) throws Exception {
133+
return new HudiSourceSplitEnumerator(enumeratorContext, tablePath, this.confFiles);
134+
}
135+
136+
@Override
137+
public SourceSplitEnumerator<HudiSourceSplit, HudiSourceState> restoreEnumerator(SourceSplitEnumerator.Context<HudiSourceSplit> enumeratorContext, HudiSourceState checkpointState) throws Exception {
138+
return new HudiSourceSplitEnumerator(enumeratorContext, tablePath, this.confFiles, checkpointState);
139+
}
140+
141+
@Override
142+
public Serializer<HudiSourceState> getEnumeratorStateSerializer() {
143+
return new DefaultSerializer<>();
144+
}
145+
}

0 commit comments

Comments
 (0)