Skip to content

Commit 405f7d6

Browse files
[Feature][Connector-V2] Add redis source connector (#2569)
1 parent f17a17a commit 405f7d6

File tree

16 files changed

+855
-1
lines changed

16 files changed

+855
-1
lines changed

docs/en/connector-v2/source/Redis.md

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# Redis
2+
3+
> Redis source connector
4+
5+
## Description
6+
7+
Used to read data from Redis. Only support batch mode.
8+
9+
## Options
10+
11+
| name | type | required | default value |
12+
|-----------|--------|----------|---------------|
13+
| host | string | yes | - |
14+
| port | int | yes | - |
15+
| keys | string | yes | - |
16+
| data_type | string | yes | - |
17+
| auth | string | No | - |
18+
| schema | config | No | - |
19+
| format | string | No | json |
20+
21+
### host [string]
22+
23+
redis host
24+
25+
### port [int]
26+
27+
redis port
28+
29+
### keys [string]
30+
31+
keys pattern
32+
33+
**Tips:Redis source connector support fuzzy key matching, user needs to ensure that the matched keys are the same type**
34+
35+
### data_type [string]
36+
37+
redis data types, support `key` `hash` `list` `set` `zset`
38+
39+
- key
40+
> The value of each key will be sent downstream as a single row of data.
41+
> For example, the value of key is `SeaTunnel test message`, the data received downstream is `SeaTunnel test message` and only one message will be received.
42+
43+
44+
- hash
45+
> The hash key-value pairs will be formatted as json to be sent downstream as a single row of data.
46+
> For example, the value of hash is `name:tyrantlucifer age:26`, the data received downstream is `{"name":"tyrantlucifer", "age":"26"}` and only one message will be received.
47+
48+
- list
49+
> Each element in the list will be sent downstream as a single row of data.
50+
> For example, the value of list is `[tyrantlucier, CalvinKirs]`, the data received downstream are `tyrantlucifer` and `CalvinKirs` and only two message will be received.
51+
52+
- set
53+
> Each element in the set will be sent downstream as a single row of data
54+
> For example, the value of set is `[tyrantlucier, CalvinKirs]`, the data received downstream are `tyrantlucifer` and `CalvinKirs` and only two message will be received.
55+
56+
- zset
57+
> Each element in the sorted set will be sent downstream as a single row of data
58+
> For example, the value of sorted set is `[tyrantlucier, CalvinKirs]`, the data received downstream are `tyrantlucifer` and `CalvinKirs` and only two message will be received.
59+
60+
### auth [String]
61+
62+
redis authentication password, you need it when you connect to an encrypted cluster
63+
64+
### format [String]
65+
66+
the format of upstream data, now only support `json` `text`, default `json`.
67+
68+
when you assign format is `json`, you should also assign schema option, for example:
69+
70+
upstream data is the following:
71+
72+
```json
73+
74+
{"code": 200, "data": "get success", "success": true}
75+
76+
```
77+
78+
you should assign schema as the following:
79+
80+
```hocon
81+
82+
schema {
83+
fields {
84+
code = int
85+
data = string
86+
success = boolean
87+
}
88+
}
89+
90+
```
91+
92+
connector will generate data as the following:
93+
94+
| code | data | success |
95+
|------|-------------|---------|
96+
| 200 | get success | true |
97+
98+
when you assign format is `text`, connector will do nothing for upstream data, for example:
99+
100+
upstream data is the following:
101+
102+
```json
103+
104+
{"code": 200, "data": "get success", "success": true}
105+
106+
```
107+
108+
connector will generate data as the following:
109+
110+
| content |
111+
|---------|
112+
| {"code": 200, "data": "get success", "success": true} |
113+
114+
### schema [Config]
115+
116+
#### fields [Config]
117+
118+
the schema fields of upstream data
119+
120+
## Example
121+
122+
simple:
123+
124+
```hocon
125+
Redis {
126+
host = localhost
127+
port = 6379
128+
keys = "key_test*"
129+
data_type = key
130+
format = text
131+
}
132+
```
133+
134+
```hocon
135+
Redis {
136+
host = localhost
137+
port = 6379
138+
keys = "key_test*"
139+
data_type = key
140+
format = json
141+
schema {
142+
fields {
143+
name = string
144+
age = int
145+
}
146+
}
147+
}
148+
```
149+

plugin-mapping.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,5 +121,6 @@ seatunnel.sink.IoTDB = connector-iotdb
121121
seatunnel.sink.Neo4j = connector-neo4j
122122
seatunnel.sink.FtpFile = connector-file-ftp
123123
seatunnel.sink.Socket = connector-socket
124+
seatunnel.source.Redis = connector-redis
124125
seatunnel.sink.DataHub = connector-datahub
125126

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@
231231
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
232232
<awaitility.version>4.2.0</awaitility.version>
233233
<neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
234+
<jedis.version>4.2.2</jedis.version>
234235
<datahub.version>2.19.0-public</datahub.version>
235236
</properties>
236237

@@ -592,6 +593,12 @@
592593
<version>${sshd.version}</version>
593594
</dependency>
594595

596+
<dependency>
597+
<groupId>redis.clients</groupId>
598+
<artifactId>jedis</artifactId>
599+
<version>${jedis.version}</version>
600+
</dependency>
601+
595602
<dependency>
596603
<groupId>org.junit.jupiter</groupId>
597604
<artifactId>junit-jupiter-engine</artifactId>

seatunnel-connectors-v2-dist/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@
146146
<artifactId>connector-neo4j</artifactId>
147147
<version>${project.version}</version>
148148
</dependency>
149+
<dependency>
150+
<groupId>org.apache.seatunnel</groupId>
151+
<artifactId>connector-redis</artifactId>
152+
<version>${project.version}</version>
153+
</dependency>
149154
<dependency>
150155
<groupId>org.apache.seatunnel</groupId>
151156
<artifactId>connector-datahub</artifactId>
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<parent>
24+
<artifactId>seatunnel-connectors-v2</artifactId>
25+
<groupId>org.apache.seatunnel</groupId>
26+
<version>${revision}</version>
27+
</parent>
28+
<modelVersion>4.0.0</modelVersion>
29+
30+
<artifactId>connector-redis</artifactId>
31+
32+
<dependencies>
33+
34+
<dependency>
35+
<groupId>org.apache.seatunnel</groupId>
36+
<artifactId>connector-common</artifactId>
37+
<version>${project.version}</version>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>org.apache.seatunnel</groupId>
42+
<artifactId>seatunnel-format-json</artifactId>
43+
<version>${project.version}</version>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>redis.clients</groupId>
48+
<artifactId>jedis</artifactId>
49+
</dependency>
50+
51+
</dependencies>
52+
53+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redis.config;
19+
20+
public class RedisConfig {
21+
public static final String HOST = "host";
22+
public static final String PORT = "port";
23+
public static final String AUTH = "auth";
24+
public static final String KEY_PATTERN = "keys";
25+
public static final String DATA_TYPE = "data_type";
26+
public static final String FORMAT = "format";
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redis.config;
19+
20+
import org.apache.seatunnel.common.utils.JsonUtils;
21+
22+
import redis.clients.jedis.Jedis;
23+
24+
import java.util.ArrayList;
25+
import java.util.Collections;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Set;
29+
30+
public enum RedisDataType {
31+
KEY {
32+
@Override
33+
public List<String> get(Jedis jedis, String key) {
34+
return Collections.singletonList(jedis.get(key));
35+
}
36+
},
37+
HASH {
38+
@Override
39+
public List<String> get(Jedis jedis, String key) {
40+
Map<String, String> kvMap = jedis.hgetAll(key);
41+
return Collections.singletonList(JsonUtils.toJsonString(kvMap));
42+
}
43+
},
44+
LIST {
45+
@Override
46+
public List<String> get(Jedis jedis, String key) {
47+
return jedis.lrange(key, 0, -1);
48+
}
49+
},
50+
SET {
51+
@Override
52+
public List<String> get(Jedis jedis, String key) {
53+
Set<String> members = jedis.smembers(key);
54+
return new ArrayList<>(members);
55+
}
56+
},
57+
ZSET {
58+
@Override
59+
public List<String> get(Jedis jedis, String key) {
60+
return jedis.zrange(key, 0, -1);
61+
}
62+
};
63+
64+
public List<String> get(Jedis jedis, String key) {
65+
return Collections.emptyList();
66+
}
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redis.config;
19+
20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
22+
import lombok.Data;
23+
24+
import java.io.Serializable;
25+
26+
@Data
27+
public class RedisParameters implements Serializable {
28+
private String host;
29+
private int port;
30+
private String auth = "";
31+
private String keysPattern;
32+
private RedisDataType redisDataType;
33+
34+
public void buildWithConfig(Config config) {
35+
// set host
36+
this.host = config.getString(RedisConfig.HOST);
37+
// set port
38+
this.port = config.getInt(RedisConfig.PORT);
39+
// set auth
40+
if (config.hasPath(RedisConfig.AUTH)) {
41+
this.auth = config.getString(RedisConfig.AUTH);
42+
}
43+
// set keysPattern
44+
this.keysPattern = config.getString(RedisConfig.KEY_PATTERN);
45+
// set redis data type
46+
try {
47+
String dataType = config.getString(RedisConfig.DATA_TYPE);
48+
this.redisDataType = RedisDataType.valueOf(dataType.toUpperCase());
49+
} catch (IllegalArgumentException e) {
50+
throw new RuntimeException("Redis source connector only support these data types [key, hash, list, set, zset]", e);
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)