Skip to content

Commit 6172030

Browse files
authored
[Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845)
* Fix Orc Read in Flink * fix license header * exclusion orc-core from flink-orc exclusion orc-core from seatunnel-core-base * only connector-file/pom.xml need add shade plugin and connector.name properties
1 parent f0a1f50 commit 6172030

File tree

17 files changed

+321
-158
lines changed

17 files changed

+321
-158
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<parent>
24+
<artifactId>connector-file</artifactId>
25+
<groupId>org.apache.seatunnel</groupId>
26+
<version>${revision}</version>
27+
</parent>
28+
<modelVersion>4.0.0</modelVersion>
29+
30+
<artifactId>connector-file-base-hadoop</artifactId>
31+
32+
<dependencies>
33+
<dependency>
34+
<groupId>org.apache.seatunnel</groupId>
35+
<artifactId>connector-file-base</artifactId>
36+
<version>${project.version}</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.apache.flink</groupId>
40+
<artifactId>flink-shaded-hadoop-2</artifactId>
41+
<scope>provided</scope>
42+
</dependency>
43+
</dependencies>
44+
45+
<build>
46+
<plugins>
47+
<plugin>
48+
<groupId>org.apache.maven.plugins</groupId>
49+
<artifactId>maven-shade-plugin</artifactId>
50+
<configuration>
51+
<skip>true</skip>
52+
</configuration>
53+
</plugin>
54+
<!-- make sure that flatten runs after maven-shade-plugin -->
55+
<plugin>
56+
<groupId>org.codehaus.mojo</groupId>
57+
<artifactId>flatten-maven-plugin</artifactId>
58+
</plugin>
59+
</plugins>
60+
</build>
61+
62+
</project>
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
19+
20+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
21+
22+
import org.apache.seatunnel.api.common.PrepareFailException;
23+
import org.apache.seatunnel.common.config.CheckConfigUtil;
24+
import org.apache.seatunnel.common.config.CheckResult;
25+
import org.apache.seatunnel.common.constants.PluginType;
26+
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
27+
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
28+
29+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
30+
31+
public abstract class BaseHdfsFileSink extends BaseFileSink {
32+
33+
@Override
34+
public void prepare(Config pluginConfig) throws PrepareFailException {
35+
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, FS_DEFAULT_NAME_KEY);
36+
if (!result.isSuccess()) {
37+
throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
38+
}
39+
super.prepare(pluginConfig);
40+
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
41+
}
42+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
19+
20+
import org.apache.seatunnel.api.common.PrepareFailException;
21+
import org.apache.seatunnel.common.config.CheckConfigUtil;
22+
import org.apache.seatunnel.common.config.CheckResult;
23+
import org.apache.seatunnel.common.constants.PluginType;
24+
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
25+
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
26+
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
27+
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
28+
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
29+
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
30+
31+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
32+
33+
import java.io.IOException;
34+
35+
public abstract class BaseHdfsFileSource extends BaseFileSource {
36+
37+
@Override
38+
public void prepare(Config pluginConfig) throws PrepareFailException {
39+
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HdfsSourceConfig.FILE_PATH, HdfsSourceConfig.FILE_TYPE, HdfsSourceConfig.DEFAULT_FS);
40+
if (!result.isSuccess()) {
41+
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
42+
}
43+
readStrategy = ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE));
44+
String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH);
45+
hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS));
46+
try {
47+
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
48+
} catch (IOException e) {
49+
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
50+
}
51+
// support user-defined schema
52+
if (pluginConfig.hasPath(HdfsSourceConfig.SCHEMA)) {
53+
Config schemaConfig = pluginConfig.getConfig(HdfsSourceConfig.SCHEMA);
54+
rowType = SeaTunnelSchema
55+
.buildWithConfig(schemaConfig)
56+
.getSeaTunnelRowType();
57+
readStrategy.setSeaTunnelRowTypeInfo(rowType);
58+
} else {
59+
try {
60+
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
61+
} catch (FilePluginException e) {
62+
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
63+
}
64+
}
65+
}
66+
}
File renamed without changes.

seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<modelVersion>4.0.0</modelVersion>
2929

3030
<artifactId>connector-file-base</artifactId>
31-
31+
3232
<properties>
3333
<commons-net.version>3.6</commons-net.version>
3434
<orc.version>1.5.6</orc.version>
@@ -37,7 +37,7 @@
3737
<flink.hadoop.version>2.7.5-7.0</flink.hadoop.version>
3838
<parquet-avro.version>1.12.3</parquet-avro.version>
3939
</properties>
40-
40+
4141
<dependencyManagement>
4242
<dependencies>
4343
<dependency>
@@ -67,6 +67,12 @@
6767
<groupId>org.apache.seatunnel</groupId>
6868
<artifactId>seatunnel-core-base</artifactId>
6969
<version>${project.version}</version>
70+
<exclusions>
71+
<exclusion>
72+
<groupId>org.apache.orc</groupId>
73+
<artifactId>orc-core</artifactId>
74+
</exclusion>
75+
</exclusions>
7076
<scope>test</scope>
7177
</dependency>
7278

@@ -92,11 +98,16 @@
9298
<groupId>org.apache.orc</groupId>
9399
<artifactId>orc-core</artifactId>
94100
<version>${orc.version}</version>
101+
<classifier>nohive</classifier>
95102
<exclusions>
96103
<exclusion>
97104
<artifactId>hadoop-common</artifactId>
98105
<groupId>org.apache.hadoop</groupId>
99106
</exclusion>
107+
<exclusion>
108+
<artifactId>hadoop-hdfs</artifactId>
109+
<groupId>org.apache.hadoop</groupId>
110+
</exclusion>
100111
</exclusions>
101112
</dependency>
102113

@@ -116,26 +127,16 @@
116127
<groupId>org.apache.flink</groupId>
117128
<artifactId>flink-shaded-hadoop-2</artifactId>
118129
</dependency>
119-
120130
</dependencies>
121131

122132
<build>
123133
<plugins>
124134
<plugin>
125135
<groupId>org.apache.maven.plugins</groupId>
126136
<artifactId>maven-shade-plugin</artifactId>
127-
<executions>
128-
<execution>
129-
<phase>package</phase>
130-
<goals>
131-
<goal>shade</goal>
132-
</goals>
133-
<!-- base module need skip shading -->
134-
<configuration>
135-
<skip>true</skip>
136-
</configuration>
137-
</execution>
138-
</executions>
137+
<configuration>
138+
<skip>true</skip>
139+
</configuration>
139140
</plugin>
140141
<!-- make sure that flatten runs after maven-shade-plugin -->
141142
<plugin>

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@
2424

2525
import lombok.NonNull;
2626
import org.apache.hadoop.fs.Path;
27-
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
28-
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
29-
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
30-
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
31-
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
3227
import org.apache.orc.CompressionKind;
3328
import org.apache.orc.OrcFile;
3429
import org.apache.orc.TypeDescription;
3530
import org.apache.orc.Writer;
31+
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
32+
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
33+
import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
34+
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
35+
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
3636

3737
import java.io.IOException;
3838
import java.math.BigInteger;
@@ -91,12 +91,12 @@ private Writer getOrCreateWriter(@NonNull String filePath) {
9191
Path path = new Path(filePath);
9292
try {
9393
OrcFile.WriterOptions options = OrcFile.writerOptions(getConfiguration(hadoopConf))
94-
.setSchema(schema)
95-
// temporarily used snappy
96-
.compress(CompressionKind.SNAPPY)
97-
// use orc version 0.12
98-
.version(OrcFile.Version.V_0_12)
99-
.overwrite(true);
94+
.setSchema(schema)
95+
// temporarily used snappy
96+
.compress(CompressionKind.SNAPPY)
97+
// use orc version 0.12
98+
.version(OrcFile.Version.V_0_12)
99+
.overwrite(true);
100100
Writer newWriter = OrcFile.createWriter(path, options);
101101
this.beingWrittenWriter.put(filePath, newWriter);
102102
return newWriter;
@@ -170,7 +170,7 @@ private void setLongColumnVector(Object value, LongColumnVector longVector, int
170170
if (value instanceof Boolean) {
171171
Boolean bool = (Boolean) value;
172172
longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0);
173-
} else if (value instanceof Integer) {
173+
} else if (value instanceof Integer) {
174174
longVector.vector[row] = ((Integer) value).longValue();
175175
} else if (value instanceof Long) {
176176
longVector.vector[row] = (Long) value;

0 commit comments

Comments
 (0)