Skip to content

Commit c747e02

Browse files
authored
[improve] amazon sqs connector update (#8602)
1 parent 629f85b commit c747e02

File tree

10 files changed

+246
-190
lines changed

10 files changed

+246
-190
lines changed

seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,6 @@ private Set<String> buildWhiteList() {
194194
whiteList.add("EasysearchSourceOptions");
195195
whiteList.add("RabbitmqSinkOptions");
196196
whiteList.add("StarRocksSourceOptions");
197-
whiteList.add("AmazonSqsSinkOptions");
198197
whiteList.add("IcebergSourceOptions");
199198
whiteList.add("HbaseSourceOptions");
200199
whiteList.add("PaimonSourceOptions");
Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import java.io.Serializable;
2424

25-
public class AmazonSqsConfig implements Serializable {
25+
public class AmazonSqsBaseOptions implements Serializable {
2626

2727
public static final String DEFAULT_FIELD_DELIMITER = ",";
2828

@@ -47,11 +47,6 @@ public class AmazonSqsConfig implements Serializable {
4747
.noDefaultValue()
4848
.withDescription("The access secret key of Amazon SQS Service");
4949

50-
public static final Option<String> MESSAGE_GROUP_ID =
51-
Options.key("message_group_id")
52-
.stringType()
53-
.noDefaultValue()
54-
.withDescription("The message group id of Amazon SQS Service");
5550
public static final Option<MessageFormat> FORMAT =
5651
Options.key("format")
5752
.enumType(MessageFormat.class)
@@ -64,15 +59,4 @@ public class AmazonSqsConfig implements Serializable {
6459
.stringType()
6560
.noDefaultValue()
6661
.withDescription("Customize the field delimiter for data format.");
67-
public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
68-
Options.key("debezium_record_include_schema")
69-
.booleanType()
70-
.defaultValue(true)
71-
.withDescription("Does the debezium record carry a schema.");
72-
73-
public static final Option<Boolean> DELETE_MESSAGE =
74-
Options.key("delete_message")
75-
.booleanType()
76-
.defaultValue(false)
77-
.withDescription("Delete the message after it is consumed if set true.");
7862
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.amazonsqs.config;
19+
20+
public class AmazonSqsSinkOptions extends AmazonSqsBaseOptions {}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.amazonsqs.config;
19+
20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
23+
24+
import lombok.AllArgsConstructor;
25+
import lombok.Data;
26+
27+
import java.io.Serializable;
28+
29+
@Data
30+
@AllArgsConstructor
31+
public class AmazonSqsSourceConfig implements Serializable {
32+
33+
private String url;
34+
35+
private String region;
36+
37+
private String accessKeyId;
38+
39+
private String secretAccessKey;
40+
41+
private String messageGroupId;
42+
43+
private boolean deleteMessage;
44+
45+
private Config schema;
46+
47+
public AmazonSqsSourceConfig(ReadonlyConfig config) {
48+
this.url = config.get(AmazonSqsSourceOptions.URL);
49+
this.region = config.get(AmazonSqsSourceOptions.REGION);
50+
this.accessKeyId = config.get(AmazonSqsSourceOptions.ACCESS_KEY_ID);
51+
this.secretAccessKey = config.get(AmazonSqsSourceOptions.SECRET_ACCESS_KEY);
52+
this.messageGroupId = config.get(AmazonSqsSourceOptions.MESSAGE_GROUP_ID);
53+
this.deleteMessage = config.get(AmazonSqsSourceOptions.DELETE_MESSAGE);
54+
this.schema = ReadonlyConfig.fromMap(config.get(AmazonSqsSourceOptions.SCHEMA)).toConfig();
55+
;
56+
}
57+
}

seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/config/AmazonSqsSourceOptions.java

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,50 +17,30 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.amazonsqs.config;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
2222
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
2323

24-
import lombok.AllArgsConstructor;
25-
import lombok.Data;
26-
27-
import java.io.Serializable;
28-
29-
@Data
30-
@AllArgsConstructor
31-
public class AmazonSqsSourceOptions implements Serializable {
32-
33-
private String url;
34-
35-
private String region;
36-
37-
private String accessKeyId;
38-
39-
private String secretAccessKey;
24+
import java.util.Map;
4025

41-
private String messageGroupId;
26+
public class AmazonSqsSourceOptions extends AmazonSqsBaseOptions {
27+
public static final Option<Map<String, Object>> SCHEMA = TableSchemaOptions.SCHEMA;
4228

43-
private boolean deleteMessage;
29+
public static final Option<Boolean> DELETE_MESSAGE =
30+
Options.key("delete_message")
31+
.booleanType()
32+
.defaultValue(false)
33+
.withDescription("Delete the message after it is consumed if set true.");
4434

45-
private Config schema;
35+
public static final Option<String> MESSAGE_GROUP_ID =
36+
Options.key("message_group_id")
37+
.stringType()
38+
.noDefaultValue()
39+
.withDescription("The message group id of Amazon SQS Service");
4640

47-
public AmazonSqsSourceOptions(Config config) {
48-
this.url = config.getString(AmazonSqsConfig.URL.key());
49-
this.region = config.getString(AmazonSqsConfig.REGION.key());
50-
if (config.hasPath(AmazonSqsConfig.ACCESS_KEY_ID.key())) {
51-
this.accessKeyId = config.getString(AmazonSqsConfig.ACCESS_KEY_ID.key());
52-
}
53-
if (config.hasPath(AmazonSqsConfig.SECRET_ACCESS_KEY.key())) {
54-
this.secretAccessKey = config.getString(AmazonSqsConfig.SECRET_ACCESS_KEY.key());
55-
}
56-
if (config.hasPath(AmazonSqsConfig.MESSAGE_GROUP_ID.key())) {
57-
this.messageGroupId = config.getString(AmazonSqsConfig.MESSAGE_GROUP_ID.key());
58-
}
59-
if (config.hasPath(AmazonSqsConfig.DELETE_MESSAGE.key())) {
60-
this.deleteMessage = config.getBoolean(AmazonSqsConfig.DELETE_MESSAGE.key());
61-
}
62-
if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
63-
this.schema = config.getConfig(TableSchemaOptions.SCHEMA.key());
64-
}
65-
}
41+
public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
42+
Options.key("debezium_record_include_schema")
43+
.booleanType()
44+
.defaultValue(true)
45+
.withDescription("Does the debezium record carry a schema.");
6646
}

seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@
2727

2828
import com.google.auto.service.AutoService;
2929

30-
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.REGION;
31-
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.URL;
30+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.ACCESS_KEY_ID;
31+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FIELD_DELIMITER;
32+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FORMAT;
33+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.REGION;
34+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.SECRET_ACCESS_KEY;
35+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.URL;
3236

3337
@AutoService(Factory.class)
3438
public class AmazonSqsSinkFactory implements TableSinkFactory {
@@ -46,6 +50,9 @@ public TableSink createSink(TableSinkFactoryContext context) {
4650

4751
@Override
4852
public OptionRule optionRule() {
49-
return OptionRule.builder().required(URL, REGION).build();
53+
return OptionRule.builder()
54+
.required(URL, REGION)
55+
.optional(ACCESS_KEY_ID, SECRET_ACCESS_KEY, FORMAT, FIELD_DELIMITER)
56+
.build();
5057
}
5158
}

seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkWriter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@
4141
import java.net.URI;
4242
import java.nio.charset.StandardCharsets;
4343

44-
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.ACCESS_KEY_ID;
45-
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.DEFAULT_FIELD_DELIMITER;
46-
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.FIELD_DELIMITER;
47-
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.FORMAT;
48-
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.REGION;
49-
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.SECRET_ACCESS_KEY;
50-
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.URL;
44+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.ACCESS_KEY_ID;
45+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.DEFAULT_FIELD_DELIMITER;
46+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FIELD_DELIMITER;
47+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.FORMAT;
48+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.REGION;
49+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.SECRET_ACCESS_KEY;
50+
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSinkOptions.URL;
5151

5252
public class AmazonSqsSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
5353

0 commit comments

Comments
 (0)