Skip to content

Commit 8ef080f

Browse files
authored
[Connector][Sink]Support load data to S3 then Copy to Redshift (#3736)
* [Connector][Sink]Support load data to S3 then Copy to Redshift
1 parent 5e3f196 commit 8ef080f

File tree

9 files changed

+455
-0
lines changed

9 files changed

+455
-0
lines changed

plugin-mapping.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,4 @@ seatunnel.sink.Doris = connector-doris
164164
seatunnel.source.Maxcompute = connector-maxcompute
165165
seatunnel.sink.Maxcompute = connector-maxcompute
166166
seatunnel.source.MySQL-CDC = connector-cdc-mysql
167+
seatunnel.sink.S3Redshift = connector-s3-redshift
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<parent>
24+
<artifactId>seatunnel-connectors-v2</artifactId>
25+
<groupId>org.apache.seatunnel</groupId>
26+
<version>${revision}</version>
27+
</parent>
28+
<modelVersion>4.0.0</modelVersion>
29+
30+
<artifactId>connector-s3-redshift</artifactId>
31+
32+
<properties>
33+
<redshift.version>2.1.0.9</redshift.version>
34+
</properties>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.seatunnel</groupId>
39+
<artifactId>connector-file-base-hadoop</artifactId>
40+
<version>${project.version}</version>
41+
<exclusions>
42+
<exclusion>
43+
<groupId>org.apache.flink</groupId>
44+
<artifactId>flink-shaded-hadoop-2</artifactId>
45+
</exclusion>
46+
</exclusions>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.apache.seatunnel</groupId>
50+
<artifactId>connector-file-s3</artifactId>
51+
<version>${project.version}</version>
52+
</dependency>
53+
<dependency>
54+
<groupId>com.amazon.redshift</groupId>
55+
<artifactId>redshift-jdbc42</artifactId>
56+
<version>${redshift.version}</version>
57+
</dependency>
58+
59+
</dependencies>
60+
61+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redshift;
19+
20+
import org.apache.seatunnel.common.exception.CommonErrorCode;
21+
import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig;
22+
import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;
23+
24+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
25+
26+
import java.sql.Connection;
27+
import java.sql.DatabaseMetaData;
28+
import java.sql.DriverManager;
29+
import java.sql.ResultSet;
30+
import java.sql.SQLException;
31+
import java.sql.Statement;
32+
33+
public class RedshiftJdbcClient {
34+
35+
private static volatile RedshiftJdbcClient INSTANCE = null;
36+
37+
private final Connection connection;
38+
39+
public static RedshiftJdbcClient getInstance(Config config) throws S3RedshiftJdbcConnectorException {
40+
if (INSTANCE == null) {
41+
synchronized (RedshiftJdbcClient.class) {
42+
if (INSTANCE == null) {
43+
44+
try {
45+
INSTANCE = new RedshiftJdbcClient(config.getString(S3RedshiftConfig.JDBC_URL.key()),
46+
config.getString(S3RedshiftConfig.JDBC_USER.key()),
47+
config.getString(S3RedshiftConfig.JDBC_PASSWORD.key()));
48+
} catch (SQLException | ClassNotFoundException e) {
49+
throw new S3RedshiftJdbcConnectorException(CommonErrorCode.SQL_OPERATION_FAILED,
50+
"RedshiftJdbcClient init error", e);
51+
}
52+
}
53+
}
54+
}
55+
return INSTANCE;
56+
}
57+
58+
private RedshiftJdbcClient(String url, String user, String password) throws SQLException, ClassNotFoundException {
59+
Class.forName("com.amazon.redshift.jdbc42.Driver");
60+
this.connection = DriverManager.getConnection(url, user, password);
61+
}
62+
63+
public boolean checkTableExists(String tableName) {
64+
boolean flag = false;
65+
try {
66+
DatabaseMetaData meta = connection.getMetaData();
67+
String[] type = {"TABLE"};
68+
ResultSet rs = meta.getTables(null, null, tableName, type);
69+
flag = rs.next();
70+
} catch (SQLException e) {
71+
throw new S3RedshiftJdbcConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
72+
String.format("Check table is or not existed failed, table name is %s ", tableName), e);
73+
}
74+
return flag;
75+
}
76+
77+
public boolean execute(String sql) throws Exception {
78+
try (Statement statement = connection.createStatement()) {
79+
return statement.execute(sql);
80+
}
81+
}
82+
83+
public synchronized void close() throws SQLException {
84+
connection.close();
85+
86+
}
87+
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redshift.commit;
19+
20+
import org.apache.seatunnel.common.exception.CommonErrorCode;
21+
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
22+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
23+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
24+
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
25+
import org.apache.seatunnel.connectors.seatunnel.redshift.RedshiftJdbcClient;
26+
import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig;
27+
import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;
28+
29+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
30+
31+
import lombok.extern.slf4j.Slf4j;
32+
import org.apache.commons.lang3.StringUtils;
33+
34+
import java.io.IOException;
35+
import java.sql.SQLException;
36+
import java.util.ArrayList;
37+
import java.util.List;
38+
import java.util.Map;
39+
40+
@Slf4j
41+
public class S3RedshiftSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
42+
43+
private final String executeSql;
44+
45+
private Config pluginConfig;
46+
47+
public S3RedshiftSinkAggregatedCommitter(HadoopConf hadoopConf, Config pluginConfig) {
48+
super(hadoopConf);
49+
this.pluginConfig = pluginConfig;
50+
this.executeSql = pluginConfig.getString(S3RedshiftConfig.EXECUTE_SQL.key());
51+
}
52+
53+
@Override
54+
public List<FileAggregatedCommitInfo> commit(List<FileAggregatedCommitInfo> aggregatedCommitInfos) {
55+
List<FileAggregatedCommitInfo> errorAggregatedCommitInfoList = new ArrayList<>();
56+
aggregatedCommitInfos.forEach(aggregatedCommitInfo -> {
57+
try {
58+
for (Map.Entry<String, Map<String, String>> entry : aggregatedCommitInfo.getTransactionMap().entrySet()) {
59+
for (Map.Entry<String, String> tmpFileEntry : entry.getValue().entrySet()) {
60+
String sql = convertSql(tmpFileEntry.getKey());
61+
log.debug("execute redshift sql is:" + sql);
62+
RedshiftJdbcClient.getInstance(pluginConfig).execute(sql);
63+
try {
64+
FileSystemUtils.deleteFile(tmpFileEntry.getKey());
65+
} catch (IOException e) {
66+
log.warn("delete tmp file error:" + tmpFileEntry.getKey());
67+
}
68+
}
69+
70+
}
71+
72+
} catch (Exception e) {
73+
log.error("commit aggregatedCommitInfo error ", e);
74+
errorAggregatedCommitInfoList.add(aggregatedCommitInfo);
75+
}
76+
});
77+
return errorAggregatedCommitInfoList;
78+
}
79+
80+
@Override
81+
public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) {
82+
if (aggregatedCommitInfos == null || aggregatedCommitInfos.isEmpty()) {
83+
return;
84+
}
85+
aggregatedCommitInfos.forEach(aggregatedCommitInfo -> {
86+
try {
87+
for (Map.Entry<String, Map<String, String>> entry : aggregatedCommitInfo.getTransactionMap().entrySet()) {
88+
// delete the transaction dir
89+
FileSystemUtils.deleteFile(entry.getKey());
90+
}
91+
} catch (Exception e) {
92+
log.error("abort aggregatedCommitInfo error ", e);
93+
}
94+
});
95+
}
96+
97+
@Override
98+
public void close() throws IOException {
99+
super.close();
100+
try {
101+
RedshiftJdbcClient.getInstance(pluginConfig).close();
102+
} catch (SQLException e) {
103+
throw new S3RedshiftJdbcConnectorException(CommonErrorCode.SQL_OPERATION_FAILED,
104+
"close redshift jdbc client failed", e);
105+
}
106+
}
107+
108+
private String convertSql(String path) {
109+
return StringUtils.replace(executeSql, "${path}", path);
110+
}
111+
112+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redshift.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
23+
24+
public class S3RedshiftConfig extends S3Config {
25+
26+
public static final Option<String> JDBC_URL = Options.key("jdbc_url").stringType().noDefaultValue().withDescription("Redshift JDBC URL");
27+
28+
public static final Option<String> JDBC_USER = Options.key("jdbc_user").stringType().noDefaultValue().withDescription("Redshift JDBC user");
29+
30+
public static final Option<String> JDBC_PASSWORD = Options.key("jdbc_password").stringType().noDefaultValue().withDescription("Redshift JDBC password");
31+
32+
public static final Option<String> EXECUTE_SQL = Options.key("execute_sql").stringType().noDefaultValue().withDescription("Redshift execute sql");
33+
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redshift.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
22+
23+
public class S3RedshiftJdbcConnectorException extends SeaTunnelRuntimeException {
24+
25+
public S3RedshiftJdbcConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
26+
super(seaTunnelErrorCode, errorMessage);
27+
}
28+
29+
public S3RedshiftJdbcConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
30+
super(seaTunnelErrorCode, errorMessage, cause);
31+
}
32+
33+
public S3RedshiftJdbcConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
34+
super(seaTunnelErrorCode, cause);
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redshift.sink;
19+
20+
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.table.factory.Factory;
22+
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
23+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
24+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
25+
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
26+
import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig;
27+
28+
import com.google.auto.service.AutoService;
29+
30+
@AutoService(Factory.class)
31+
public class S3RedshiftFactory implements TableSinkFactory {
32+
33+
@Override
34+
public String factoryIdentifier() {
35+
return "S3Redshift";
36+
}
37+
38+
@Override
39+
public OptionRule optionRule() {
40+
return OptionRule.builder()
41+
.required(S3Config.S3_BUCKET, S3RedshiftConfig.JDBC_URL, S3RedshiftConfig.JDBC_USER, S3RedshiftConfig.JDBC_PASSWORD, S3RedshiftConfig.EXECUTE_SQL, BaseSourceConfig.FILE_PATH)
42+
.optional(S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY)
43+
.optional(BaseSinkConfig.FILE_FORMAT)
44+
.optional(BaseSinkConfig.FILENAME_TIME_FORMAT)
45+
.optional(BaseSinkConfig.FIELD_DELIMITER)
46+
.optional(BaseSinkConfig.ROW_DELIMITER)
47+
.optional(BaseSinkConfig.PARTITION_BY)
48+
.optional(BaseSinkConfig.PARTITION_DIR_EXPRESSION)
49+
.optional(BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE)
50+
.optional(BaseSinkConfig.SINK_COLUMNS)
51+
.optional(BaseSinkConfig.IS_ENABLE_TRANSACTION)
52+
.optional(BaseSinkConfig.FILE_NAME_EXPRESSION)
53+
.build();
54+
}
55+
}

0 commit comments

Comments
 (0)