Skip to content

Commit a8738ef

Browse files
[Improve][Connector-V2][Hive] Hive Sink Support msck partitions (#3133)
[Improve][Connector-V2][Hive] Hive Sink Support msck partitions (#3133)
1 parent 2b44992 commit a8738ef

File tree

5 files changed

+134
-36
lines changed

5 files changed

+134
-36
lines changed

docs/en/connector-v2/sink/Hive.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,3 +159,10 @@ sink {
159159
}
160160
}
161161
```
162+
163+
## Changelog
164+
165+
| Version | Date | Pull Request | Subject |
166+
|------------|------------|-----------------------------------------------------------------|-----------------------------------------------|
167+
| 2.2.0-beta | 2022-09-26 | | Add Hive Sink |
168+
| 2.3.0-beta | 2022-10-19 | [3133](https://github.com/apache/incubator-seatunnel/pull/3133) | Hive Sink supports automatic partition repair |
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hive.commit;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
21+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
22+
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
23+
24+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
25+
26+
import lombok.extern.slf4j.Slf4j;
27+
import org.apache.thrift.TException;
28+
29+
import java.io.IOException;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.stream.Collectors;
33+
34+
@Slf4j
35+
public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
36+
private final Config pluginConfig;
37+
private final String dbName;
38+
private final String tableName;
39+
40+
public HiveSinkAggregatedCommitter(Config pluginConfig, String dbName, String tableName) {
41+
this.pluginConfig = pluginConfig;
42+
this.dbName = dbName;
43+
this.tableName = tableName;
44+
}
45+
46+
@Override
47+
public List<FileAggregatedCommitInfo> commit(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
48+
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
49+
List<FileAggregatedCommitInfo> errorCommitInfos = super.commit(aggregatedCommitInfos);
50+
if (errorCommitInfos.isEmpty()) {
51+
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
52+
Map<String, List<String>> partitionDirAndValuesMap = aggregatedCommitInfo.getPartitionDirAndValuesMap();
53+
List<String> partitions = partitionDirAndValuesMap.keySet().stream()
54+
.map(partition -> partition.replaceAll("\\\\", "/"))
55+
.collect(Collectors.toList());
56+
try {
57+
hiveMetaStore.addPartitions(dbName, tableName, partitions);
58+
log.info("Add these partitions [{}]", partitions);
59+
} catch (TException e) {
60+
log.error("Failed to add these partitions [{}]", partitions);
61+
errorCommitInfos.add(aggregatedCommitInfo);
62+
}
63+
}
64+
}
65+
hiveMetaStore.close();
66+
return errorCommitInfos;
67+
}
68+
69+
@Override
70+
public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws Exception {
71+
super.abort(aggregatedCommitInfos);
72+
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
73+
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
74+
Map<String, List<String>> partitionDirAndValuesMap = aggregatedCommitInfo.getPartitionDirAndValuesMap();
75+
List<String> partitions = partitionDirAndValuesMap.keySet().stream()
76+
.map(partition -> partition.replaceAll("\\\\", "/"))
77+
.collect(Collectors.toList());
78+
try {
79+
hiveMetaStore.dropPartitions(dbName, tableName, partitions);
80+
log.info("Remove these partitions [{}]", partitions);
81+
} catch (TException e) {
82+
log.error("Failed to remove these partitions [{}]", partitions);
83+
}
84+
}
85+
hiveMetaStore.close();
86+
}
87+
}

seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public static Pair<String[], Table> getTableInfo(Config config) {
4242
}
4343
HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(config);
4444
Table tableInformation = hiveMetaStoreProxy.getTable(splits[0], splits[1]);
45+
hiveMetaStoreProxy.close();
4546
return Pair.of(splits, tableInformation);
4647
}
4748
}

seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,29 @@
2121
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_FORMAT;
2222
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_NAME_EXPRESSION;
2323
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.IS_PARTITION_FIELD_WRITE_IN_FILE;
24+
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PARTITION_BY;
2425
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PATH;
2526
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.ROW_DELIMITER;
2627
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SAVE_MODE;
28+
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SINK_COLUMNS;
2729
import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_OUTPUT_FORMAT_CLASSNAME;
2830
import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_OUTPUT_FORMAT_CLASSNAME;
2931
import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_OUTPUT_FORMAT_CLASSNAME;
3032

3133
import org.apache.seatunnel.api.common.PrepareFailException;
3234
import org.apache.seatunnel.api.sink.SeaTunnelSink;
33-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
35+
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
3436
import org.apache.seatunnel.common.config.CheckConfigUtil;
3537
import org.apache.seatunnel.common.config.CheckResult;
3638
import org.apache.seatunnel.common.constants.PluginType;
3739
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
3840
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
3941
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
42+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
43+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
4044
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
45+
import org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
4146
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
42-
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
4347

4448
import org.apache.seatunnel.shade.com.typesafe.config.Config;
4549
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
@@ -50,10 +54,12 @@
5054
import org.apache.hadoop.hive.metastore.api.FieldSchema;
5155
import org.apache.hadoop.hive.metastore.api.Table;
5256

57+
import java.io.IOException;
5358
import java.net.URI;
5459
import java.net.URISyntaxException;
5560
import java.util.List;
5661
import java.util.Map;
62+
import java.util.Optional;
5763
import java.util.stream.Collectors;
5864

5965
@AutoService(SeaTunnelSink.class)
@@ -67,29 +73,6 @@ public String getPluginName() {
6773
return "Hive";
6874
}
6975

70-
@Override
71-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
72-
super.setTypeInfo(seaTunnelRowType);
73-
HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(pluginConfig);
74-
// --------------------Check textFileSinkConfig with the hive table info-------------------
75-
List<FieldSchema> fields = hiveMetaStoreProxy.getTableFields(dbName, tableName);
76-
List<FieldSchema> partitionKeys = tableInformation.getPartitionKeys();
77-
78-
// Remove partitionKeys from table fields
79-
List<FieldSchema> fieldNotContainPartitionKey = fields.stream().filter(filed -> !partitionKeys.contains(filed)).collect(Collectors.toList());
80-
81-
// check fields size must same as sinkColumnList size
82-
if (fieldNotContainPartitionKey.size() != textFileSinkConfig.getSinkColumnList().size()) {
83-
throw new RuntimeException("sink columns size must same as hive table field size");
84-
}
85-
86-
// check hivePartitionFieldList size must same as partitionFieldList size
87-
if (partitionKeys.size() != textFileSinkConfig.getPartitionFieldList().size()) {
88-
throw new RuntimeException("partition by columns size must same as hive table partition columns size");
89-
}
90-
hiveMetaStoreProxy.close();
91-
}
92-
9376
@Override
9477
public void prepare(Config pluginConfig) throws PrepareFailException {
9578
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HiveConfig.METASTORE_URI, HiveConfig.TABLE_NAME);
@@ -100,6 +83,13 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
10083
dbName = tableInfo.getLeft()[0];
10184
tableName = tableInfo.getLeft()[1];
10285
tableInformation = tableInfo.getRight();
86+
List<String> sinkFields = tableInformation.getSd().getCols().stream()
87+
.map(FieldSchema::getName)
88+
.collect(Collectors.toList());
89+
List<String> partitionKeys = tableInformation.getPartitionKeys().stream()
90+
.map(FieldSchema::getName)
91+
.collect(Collectors.toList());
92+
sinkFields.addAll(partitionKeys);
10393
String outputFormat = tableInformation.getSd().getOutputFormat();
10494
if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
10595
Map<String, String> parameters = tableInformation.getSd().getSerdeInfo().getParameters();
@@ -113,10 +103,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
113103
} else {
114104
throw new RuntimeException("Only support [text parquet orc] file now");
115105
}
116-
pluginConfig = pluginConfig.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE, ConfigValueFactory.fromAnyRef(false))
117-
.withValue(FILE_NAME_EXPRESSION, ConfigValueFactory.fromAnyRef("${transactionId}"))
118-
.withValue(PATH, ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()));
119-
106+
pluginConfig = pluginConfig
107+
.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE, ConfigValueFactory.fromAnyRef(false))
108+
.withValue(FILE_NAME_EXPRESSION, ConfigValueFactory.fromAnyRef("${transactionId}"))
109+
.withValue(PATH, ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()))
110+
.withValue(SINK_COLUMNS, ConfigValueFactory.fromAnyRef(sinkFields))
111+
.withValue(PARTITION_BY, ConfigValueFactory.fromAnyRef(partitionKeys));
120112
if (!pluginConfig.hasPath(SAVE_MODE) || StringUtils.isBlank(pluginConfig.getString(SAVE_MODE))) {
121113
pluginConfig = pluginConfig.withValue(SAVE_MODE, ConfigValueFactory.fromAnyRef(SaveMode.APPEND.toString()));
122114
}
@@ -131,4 +123,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
131123
}
132124
this.pluginConfig = pluginConfig;
133125
}
126+
127+
@Override
128+
public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
129+
return Optional.of(new HiveSinkAggregatedCommitter(pluginConfig, dbName, tableName));
130+
}
134131
}

seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import lombok.NonNull;
2525
import org.apache.hadoop.hive.conf.HiveConf;
2626
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
27-
import org.apache.hadoop.hive.metastore.api.FieldSchema;
2827
import org.apache.hadoop.hive.metastore.api.MetaException;
2928
import org.apache.hadoop.hive.metastore.api.Table;
3029
import org.apache.thrift.TException;
@@ -67,12 +66,19 @@ public Table getTable(@NonNull String dbName, @NonNull String tableName) {
6766
}
6867
}
6968

70-
public List<FieldSchema> getTableFields(@NonNull String dbName, @NonNull String tableName) {
71-
try {
72-
return hiveMetaStoreClient.getFields(dbName, tableName);
73-
} catch (TException e) {
74-
String errorMsg = String.format("Get table [%s.%s] fields information failed", dbName, tableName);
75-
throw new RuntimeException(errorMsg, e);
69+
public void addPartitions(@NonNull String dbName,
70+
@NonNull String tableName,
71+
List<String> partitions) throws TException {
72+
for (String partition : partitions) {
73+
hiveMetaStoreClient.appendPartition(dbName, tableName, partition);
74+
}
75+
}
76+
77+
public void dropPartitions(@NonNull String dbName,
78+
@NonNull String tableName,
79+
List<String> partitions) throws TException {
80+
for (String partition : partitions) {
81+
hiveMetaStoreClient.dropPartition(dbName, tableName, partition, false);
7682
}
7783
}
7884

0 commit comments

Comments
 (0)