Skip to content

Commit a7ca51b

Browse files
authored
[Feature][Connector-V2][Tablestore] Support Source connector for Tablestore #7448 (#7467)
1 parent 81b7351 commit a7ca51b

File tree

12 files changed

+860
-0
lines changed

12 files changed

+860
-0
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Tablestore
2+
3+
> Tablestore source connector
4+
5+
## Description
6+
7+
Read data from Alicloud Tablestore,support full and CDC.
8+
9+
10+
## Key features
11+
12+
- [ ] [batch](../../concept/connector-v2-features.md)
13+
- [X] [stream](../../concept/connector-v2-features.md)
14+
- [ ] [exactly-once](../../concept/connector-v2-features.md)
15+
- [ ] [column projection](../../concept/connector-v2-features.md)
16+
- [ ] [parallelism](../../concept/connector-v2-features.md)
17+
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
18+
19+
## Options
20+
21+
| name | type | required | default value |
22+
|-----------------------|--------|----------|---------------|
23+
| end_point | string | yes | - |
24+
| instance_name | string | yes | - |
25+
| access_key_id | string | yes | - |
26+
| access_key_secret | string | yes | - |
27+
| table | string | yes | - |
28+
| primary_keys | array | yes | - |
29+
| schema | config | yes | - |
30+
31+
32+
### end_point [string]
33+
34+
The endpoint of Tablestore.
35+
36+
### instance_name [string]
37+
38+
The intance name of Tablestore.
39+
40+
### access_key_id [string]
41+
42+
The access id of Tablestore.
43+
44+
### access_key_secret [string]
45+
46+
The access secret of Tablestore.
47+
48+
### table [string]
49+
50+
The table name of Tablestore.
51+
52+
### primary_keys [array]
53+
54+
The primarky key of table,just add a unique primary key.
55+
56+
### schema [Config]
57+
58+
59+
60+
## Example
61+
62+
```bash
63+
env {
64+
parallelism = 1
65+
job.mode = "STREAMING"
66+
}
67+
68+
source {
69+
# This is a example source plugin **only for test and demonstrate the feature source plugin**
70+
Tablestore {
71+
end_point = "https://****.cn-zhangjiakou.tablestore.aliyuncs.com"
72+
instance_name = "****"
73+
access_key_id="***************2Ag5"
74+
access_key_secret="***********2Dok"
75+
table="test"
76+
primary_keys=["id"]
77+
schema={
78+
fields {
79+
id = string
80+
name = string
81+
}
82+
}
83+
}
84+
}
85+
86+
87+
sink {
88+
MongoDB{
89+
uri = "mongodb://localhost:27017"
90+
database = "test"
91+
collection = "test"
92+
primary-key = ["id"]
93+
schema = {
94+
fields {
95+
id = string
96+
name = string
97+
}
98+
}
99+
}
100+
}
101+
```
102+

plugin-mapping.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ seatunnel.sink.InfluxDB = connector-influxdb
8585
seatunnel.source.GoogleSheets = connector-google-sheets
8686
seatunnel.sink.GoogleFirestore = connector-google-firestore
8787
seatunnel.sink.Tablestore = connector-tablestore
88+
seatunnel.source.Tablestore = connector-tablestore
8889
seatunnel.source.Lemlist = connector-http-lemlist
8990
seatunnel.source.Klaviyo = connector-http-klaviyo
9091
seatunnel.sink.Slack = connector-slack

seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
23+
2224
import lombok.AllArgsConstructor;
2325
import lombok.Data;
2426

2527
import java.io.Serializable;
2628
import java.util.List;
29+
import java.util.Map;
2730

2831
import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
2932

@@ -45,6 +48,8 @@ public class TablestoreOptions implements Serializable {
4548

4649
public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue());
4750

51+
public TablestoreOptions() {}
52+
4853
public TablestoreOptions(Config config) {
4954
this.endpoint = config.getString(TablestoreConfig.END_POINT.key());
5055
this.instanceName = config.getString(TablestoreConfig.INSTANCE_NAME.key());
@@ -57,4 +62,18 @@ public TablestoreOptions(Config config) {
5762
this.batchSize = config.getInt(BATCH_SIZE.key());
5863
}
5964
}
65+
66+
public static TablestoreOptions of(ReadonlyConfig config) {
67+
Map<String, Object> map = config.getSourceMap();
68+
TablestoreOptions tablestoreOptions = new TablestoreOptions();
69+
tablestoreOptions.setEndpoint(config.get(TablestoreConfig.END_POINT));
70+
tablestoreOptions.setInstanceName(config.get(TablestoreConfig.INSTANCE_NAME));
71+
tablestoreOptions.setAccessKeyId(config.get(TablestoreConfig.ACCESS_KEY_ID));
72+
tablestoreOptions.setAccessKeySecret(config.get(TablestoreConfig.ACCESS_KEY_SECRET));
73+
tablestoreOptions.setTable(config.get(TablestoreConfig.TABLE));
74+
List<String> keys = (List<String>) map.get(TablestoreConfig.PRIMARY_KEYS.key());
75+
76+
tablestoreOptions.setPrimaryKeys(keys);
77+
return tablestoreOptions;
78+
}
6079
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;
18+
19+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
20+
21+
import com.alicloud.openservices.tablestore.model.StreamRecord;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer {
27+
28+
@Override
29+
public SeaTunnelRow deserialize(StreamRecord r) {
30+
List<Object> fields = new ArrayList<>();
31+
r.getColumns()
32+
.forEach(
33+
k -> {
34+
fields.add(k.getColumn().getValue());
35+
});
36+
return new SeaTunnelRow(fields.toArray());
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;
18+
19+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
20+
21+
import com.alicloud.openservices.tablestore.model.StreamRecord;
22+
23+
public interface SeaTunnelRowDeserializer {
24+
25+
SeaTunnelRow deserialize(StreamRecord streamRecord);
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
18+
19+
import org.apache.seatunnel.api.common.JobContext;
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.source.Boundedness;
22+
import org.apache.seatunnel.api.source.SeaTunnelSource;
23+
import org.apache.seatunnel.api.source.SourceReader;
24+
import org.apache.seatunnel.api.source.SourceReader.Context;
25+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
26+
import org.apache.seatunnel.api.source.SupportColumnProjection;
27+
import org.apache.seatunnel.api.source.SupportParallelism;
28+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
29+
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
30+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
31+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
32+
import org.apache.seatunnel.common.constants.JobMode;
33+
import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
34+
35+
import lombok.extern.slf4j.Slf4j;
36+
37+
import java.util.List;
38+
39+
@Slf4j
40+
public class TableStoreDBSource
41+
implements SeaTunnelSource<SeaTunnelRow, TableStoreDBSourceSplit, TableStoreDBSourceState>,
42+
SupportParallelism,
43+
SupportColumnProjection {
44+
45+
private TablestoreOptions tablestoreOptions;
46+
private SeaTunnelRowType typeInfo;
47+
private JobContext jobContext;
48+
49+
@Override
50+
public String getPluginName() {
51+
return "Tablestore";
52+
}
53+
54+
@Override
55+
public List<CatalogTable> getProducedCatalogTables() {
56+
return SeaTunnelSource.super.getProducedCatalogTables();
57+
}
58+
59+
public TableStoreDBSource(ReadonlyConfig config) {
60+
this.tablestoreOptions = TablestoreOptions.of(config);
61+
CatalogTableUtil.buildWithConfig(config);
62+
this.typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
63+
}
64+
65+
@Override
66+
public Boundedness getBoundedness() {
67+
return JobMode.BATCH.equals(jobContext.getJobMode())
68+
? Boundedness.BOUNDED
69+
: Boundedness.UNBOUNDED;
70+
}
71+
72+
@Override
73+
public SourceReader<SeaTunnelRow, TableStoreDBSourceSplit> createReader(Context readerContext)
74+
throws Exception {
75+
return new TableStoreDBSourceReader(readerContext, tablestoreOptions, typeInfo);
76+
}
77+
78+
@Override
79+
public SourceSplitEnumerator<TableStoreDBSourceSplit, TableStoreDBSourceState> createEnumerator(
80+
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<TableStoreDBSourceSplit>
81+
enumeratorContext)
82+
throws Exception {
83+
return new TableStoreDBSourceSplitEnumerator(enumeratorContext, tablestoreOptions);
84+
}
85+
86+
@Override
87+
public SourceSplitEnumerator<TableStoreDBSourceSplit, TableStoreDBSourceState>
88+
restoreEnumerator(
89+
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<
90+
TableStoreDBSourceSplit>
91+
enumeratorContext,
92+
TableStoreDBSourceState checkpointState)
93+
throws Exception {
94+
return new TableStoreDBSourceSplitEnumerator(
95+
enumeratorContext, tablestoreOptions, checkpointState);
96+
}
97+
98+
@Override
99+
public void setJobContext(JobContext jobContext) {
100+
this.jobContext = jobContext;
101+
}
102+
}

0 commit comments

Comments
 (0)