Skip to content

Commit 73f63a5

Browse files
authored
[Feature][Connector V2] expose configurable options in Cassandra (#3681)
1 parent 4151506 commit 73f63a5

File tree

10 files changed

+341
-160
lines changed

10 files changed

+341
-160
lines changed

docs/en/connector-v2/sink/Cassandra.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,19 @@ Write data to Apache Cassandra.
1212

1313
## Options
1414

15-
| name | type | required | default value |
16-
|-------------------|--------|----------|---------------|
17-
| host | String | Yes | - |
18-
| keyspace | String | Yes | - |
19-
| table | String | Yes | - |
20-
| username | String | No | - |
21-
| password | String | No | - |
22-
| datacenter | String | No | datacenter1 |
23-
| consistency_level | String | No | LOCAL_ONE |
24-
| fields | String | No | LOCAL_ONE |
25-
| batch_size | String | No | 5000 |
26-
| batch_type | String | No | UNLOGGER |
27-
| async_write | String | No | true |
15+
| name | type | required | default value |
16+
|-------------------|---------|----------|---------------|
17+
| host | String | Yes | - |
18+
| keyspace | String | Yes | - |
19+
| table | String | Yes | - |
20+
| username | String | No | - |
21+
| password | String | No | - |
22+
| datacenter | String | No | datacenter1 |
23+
| consistency_level | String | No | LOCAL_ONE |
24+
| fields | String | No | LOCAL_ONE |
25+
| batch_size | int | No | 5000 |
26+
| batch_type | String | No | UNLOGGED |
27+
| async_write | boolean | No | true |
2828

2929
### host [string]
3030

seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java

Lines changed: 39 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -17,99 +17,50 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
2122

22-
import com.datastax.oss.driver.api.core.ConsistencyLevel;
23-
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
24-
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
25-
import lombok.Data;
26-
import lombok.NoArgsConstructor;
27-
import lombok.NonNull;
28-
import lombok.ToString;
23+
public class CassandraConfig {
2924

30-
import java.io.Serializable;
31-
import java.util.List;
25+
public static final Integer DEFAULT_BATCH_SIZE = 5000;
3226

33-
@Data
34-
@ToString
35-
@NoArgsConstructor
36-
public class CassandraConfig implements Serializable {
27+
public static final Option<String> HOST =
28+
Options.key("host").stringType().noDefaultValue().withDescription("");
3729

38-
public static final String HOST = "host";
39-
public static final String USERNAME = "username";
40-
public static final String PASSWORD = "password";
41-
public static final String DATACENTER = "datacenter";
42-
public static final String KEYSPACE = "keyspace";
43-
public static final String TABLE = "table";
44-
public static final String CQL = "cql";
45-
public static final String FIELDS = "fields";
46-
public static final String CONSISTENCY_LEVEL = "consistency_level";
47-
public static final String BATCH_SIZE = "batch_size";
48-
public static final String BATCH_TYPE = "batch_type";
49-
public static final String ASYNC_WRITE = "async_write";
30+
public static final Option<String> KEYSPACE =
31+
Options.key("keyspace").stringType().noDefaultValue().withDescription("");
5032

51-
private String host;
52-
private String username;
53-
private String password;
54-
private String datacenter;
55-
private String keyspace;
56-
private String table;
57-
private String cql;
58-
private List<String> fields;
59-
private ConsistencyLevel consistencyLevel;
60-
private Integer batchSize;
61-
private DefaultBatchType batchType;
62-
private Boolean asyncWrite;
33+
public static final Option<String> USERNAME =
34+
Options.key("username").stringType().noDefaultValue().withDescription("");
35+
public static final Option<String> PASSWORD =
36+
Options.key("password").stringType().noDefaultValue().withDescription("");
37+
public static final Option<String> DATACENTER =
38+
Options.key("datacenter").stringType().defaultValue("datacenter1").withDescription("");
6339

64-
public CassandraConfig(@NonNull String host, @NonNull String keyspace) {
65-
this.host = host;
66-
this.keyspace = keyspace;
67-
}
40+
public static final Option<String> CONSISTENCY_LEVEL =
41+
Options.key("consistency_level")
42+
.stringType()
43+
.defaultValue("LOCAL_ONE")
44+
.withDescription("");
6845

69-
public static CassandraConfig getCassandraConfig(Config config) {
70-
CassandraConfig cassandraConfig =
71-
new CassandraConfig(config.getString(HOST), config.getString(KEYSPACE));
72-
if (config.hasPath(USERNAME)) {
73-
cassandraConfig.setUsername(config.getString(USERNAME));
74-
}
75-
if (config.hasPath(PASSWORD)) {
76-
cassandraConfig.setPassword(config.getString(PASSWORD));
77-
}
78-
if (config.hasPath(DATACENTER)) {
79-
cassandraConfig.setDatacenter(config.getString(DATACENTER));
80-
} else {
81-
cassandraConfig.setDatacenter("datacenter1");
82-
}
83-
if (config.hasPath(TABLE)) {
84-
cassandraConfig.setTable(config.getString(TABLE));
85-
}
86-
if (config.hasPath(CQL)) {
87-
cassandraConfig.setCql(config.getString(CQL));
88-
}
89-
if (config.hasPath(FIELDS)) {
90-
cassandraConfig.setFields(config.getStringList(FIELDS));
91-
}
92-
if (config.hasPath(CONSISTENCY_LEVEL)) {
93-
cassandraConfig.setConsistencyLevel(
94-
DefaultConsistencyLevel.valueOf(config.getString(CONSISTENCY_LEVEL)));
95-
} else {
96-
cassandraConfig.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_ONE);
97-
}
98-
if (config.hasPath(BATCH_SIZE)) {
99-
cassandraConfig.setBatchSize(config.getInt(BATCH_SIZE));
100-
} else {
101-
cassandraConfig.setBatchSize(Integer.parseInt("5000"));
102-
}
103-
if (config.hasPath(BATCH_TYPE)) {
104-
cassandraConfig.setBatchType(DefaultBatchType.valueOf(config.getString(BATCH_TYPE)));
105-
} else {
106-
cassandraConfig.setBatchType(DefaultBatchType.UNLOGGED);
107-
}
108-
if (config.hasPath(ASYNC_WRITE)) {
109-
cassandraConfig.setAsyncWrite(config.getBoolean(ASYNC_WRITE));
110-
} else {
111-
cassandraConfig.setAsyncWrite(true);
112-
}
113-
return cassandraConfig;
114-
}
46+
public static final Option<String> TABLE =
47+
Options.key("table").stringType().noDefaultValue().withDescription("");
48+
49+
public static final Option<String> FIELDS =
50+
Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("");
51+
52+
public static final Option<Integer> BATCH_SIZE =
53+
Options.key("batch_size")
54+
.intType()
55+
.defaultValue(DEFAULT_BATCH_SIZE)
56+
.withDescription("");
57+
58+
public static final Option<String> BATCH_TYPE =
59+
Options.key("batch_type").stringType().defaultValue("UNLOGGED").withDescription("");
60+
61+
public static final Option<Boolean> ASYNC_WRITE =
62+
Options.key("async_write").booleanType().defaultValue(true).withDescription("");
63+
64+
public static final Option<String> CQL =
65+
Options.key("cql").stringType().noDefaultValue().withDescription("");
11566
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
19+
20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
22+
import com.datastax.oss.driver.api.core.ConsistencyLevel;
23+
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
24+
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
25+
import lombok.Getter;
26+
import lombok.Setter;
27+
28+
import java.io.Serializable;
29+
import java.util.List;
30+
31+
@Setter
32+
@Getter
33+
public class CassandraParameters implements Serializable {
34+
private String host;
35+
private String username;
36+
private String password;
37+
private String datacenter;
38+
private String keyspace;
39+
private String table;
40+
private String cql;
41+
private List<String> fields;
42+
private ConsistencyLevel consistencyLevel;
43+
private Integer batchSize;
44+
private DefaultBatchType batchType;
45+
private Boolean asyncWrite;
46+
47+
public void buildWithConfig(Config config) {
48+
this.host = config.getString(CassandraConfig.HOST.key());
49+
this.keyspace = config.getString(CassandraConfig.KEYSPACE.key());
50+
51+
if (config.hasPath(CassandraConfig.USERNAME.key())) {
52+
this.username = config.getString(CassandraConfig.USERNAME.key());
53+
}
54+
if (config.hasPath(CassandraConfig.PASSWORD.key())) {
55+
this.password = config.getString(CassandraConfig.PASSWORD.key());
56+
}
57+
if (config.hasPath(CassandraConfig.DATACENTER.key())) {
58+
this.datacenter = config.getString(CassandraConfig.DATACENTER.key());
59+
} else {
60+
this.datacenter = CassandraConfig.DATACENTER.defaultValue();
61+
}
62+
if (config.hasPath(CassandraConfig.TABLE.key())) {
63+
this.table = config.getString(CassandraConfig.TABLE.key());
64+
}
65+
if (config.hasPath(CassandraConfig.CQL.key())) {
66+
this.cql = config.getString(CassandraConfig.CQL.key());
67+
}
68+
if (config.hasPath(CassandraConfig.FIELDS.key())) {
69+
this.fields = config.getStringList(CassandraConfig.FIELDS.key());
70+
}
71+
if (config.hasPath(CassandraConfig.CONSISTENCY_LEVEL.key())) {
72+
this.consistencyLevel =
73+
DefaultConsistencyLevel.valueOf(
74+
config.getString(CassandraConfig.CONSISTENCY_LEVEL.key()));
75+
} else {
76+
this.consistencyLevel =
77+
DefaultConsistencyLevel.valueOf(
78+
CassandraConfig.CONSISTENCY_LEVEL.defaultValue());
79+
}
80+
if (config.hasPath(CassandraConfig.BATCH_SIZE.key())) {
81+
this.batchSize = config.getInt(CassandraConfig.BATCH_SIZE.key());
82+
} else {
83+
this.batchSize = CassandraConfig.BATCH_SIZE.defaultValue();
84+
}
85+
if (config.hasPath(CassandraConfig.BATCH_TYPE.key())) {
86+
this.batchType =
87+
DefaultBatchType.valueOf(config.getString(CassandraConfig.BATCH_TYPE.key()));
88+
} else {
89+
this.batchType = DefaultBatchType.valueOf(CassandraConfig.BATCH_TYPE.defaultValue());
90+
}
91+
if (config.hasPath(CassandraConfig.ASYNC_WRITE.key())) {
92+
this.asyncWrite = config.getBoolean(CassandraConfig.ASYNC_WRITE.key());
93+
} else {
94+
this.asyncWrite = true;
95+
}
96+
}
97+
}

seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.seatunnel.common.config.CheckResult;
3131
import org.apache.seatunnel.common.constants.PluginType;
3232
import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
33-
import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
33+
import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
3434
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorErrorCode;
3535
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorException;
3636
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -51,8 +51,7 @@
5151
@AutoService(SeaTunnelSink.class)
5252
public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
5353

54-
private CassandraConfig cassandraConfig;
55-
54+
private final CassandraParameters cassandraParameters = new CassandraParameters();
5655
private SeaTunnelRowType seaTunnelRowType;
5756

5857
private ColumnDefinitions tableSchema;
@@ -63,32 +62,35 @@ public String getPluginName() {
6362
}
6463

6564
@Override
66-
public void prepare(Config config) throws PrepareFailException {
67-
CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, TABLE);
65+
public void prepare(Config pluginConfig) throws PrepareFailException {
66+
CheckResult checkResult =
67+
CheckConfigUtil.checkAllExists(
68+
pluginConfig, HOST.key(), KEYSPACE.key(), TABLE.key());
6869
if (!checkResult.isSuccess()) {
6970
throw new CassandraConnectorException(
7071
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
7172
String.format(
7273
"PluginName: %s, PluginType: %s, Message: %s",
7374
getPluginName(), PluginType.SINK, checkResult.getMsg()));
7475
}
75-
this.cassandraConfig = CassandraConfig.getCassandraConfig(config);
76+
this.cassandraParameters.buildWithConfig(pluginConfig);
7677
try (CqlSession session =
7778
CassandraClient.getCqlSessionBuilder(
78-
cassandraConfig.getHost(),
79-
cassandraConfig.getKeyspace(),
80-
cassandraConfig.getUsername(),
81-
cassandraConfig.getPassword(),
82-
cassandraConfig.getDatacenter())
79+
cassandraParameters.getHost(),
80+
cassandraParameters.getKeyspace(),
81+
cassandraParameters.getUsername(),
82+
cassandraParameters.getPassword(),
83+
cassandraParameters.getDatacenter())
8384
.build()) {
84-
List<String> fields = cassandraConfig.getFields();
85-
this.tableSchema = CassandraClient.getTableSchema(session, cassandraConfig.getTable());
85+
List<String> fields = cassandraParameters.getFields();
86+
this.tableSchema =
87+
CassandraClient.getTableSchema(session, pluginConfig.getString(TABLE.key()));
8688
if (fields == null || fields.isEmpty()) {
8789
List<String> newFields = new ArrayList<>();
8890
for (int i = 0; i < tableSchema.size(); i++) {
8991
newFields.add(tableSchema.get(i).getName().asInternal());
9092
}
91-
cassandraConfig.setFields(newFields);
93+
this.cassandraParameters.setFields(newFields);
9294
} else {
9395
for (String field : fields) {
9496
if (!tableSchema.contains(field)) {
@@ -97,7 +99,7 @@ public void prepare(Config config) throws PrepareFailException {
9799
"Field "
98100
+ field
99101
+ " does not exist in table "
100-
+ config.getString(TABLE));
102+
+ pluginConfig.getString(TABLE.key()));
101103
}
102104
}
103105
}
@@ -123,6 +125,6 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
123125
@Override
124126
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
125127
throws IOException {
126-
return new CassandraSinkWriter(cassandraConfig, seaTunnelRowType, tableSchema);
128+
return new CassandraSinkWriter(cassandraParameters, seaTunnelRowType, tableSchema);
127129
}
128130
}

0 commit comments

Comments
 (0)