Skip to content

Commit c7275a4

Browse files
[Improve][Connector-V2][Redis] Support redis cluster connection & user authentication (#3188)
* [Feature][Connector-V2][Redis] Redis source & sink connector supports redis cluster mode connection and user authentication * [Feature][Connector-V2][Redis] Update docs * [Improve][Connector-V2][Redis] Support multi nodes setting in redis cluster mode * [Improve][Connector-V2][Redis] Support parse mode for hash keys * [Improve][Connector-V2][Redis] Update redis source doc
1 parent 9bd076c commit c7275a4

File tree

6 files changed

+324
-26
lines changed

6 files changed

+324
-26
lines changed

docs/en/connector-v2/sink/Redis.md

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,18 @@ Used to write data to Redis.
1313

1414
## Options
1515

16-
| name | type | required | default value |
17-
|-------------- |--------|----------|---------------|
18-
| host | string | yes | - |
19-
| port | int | yes | - |
20-
| key | string | yes | - |
21-
| data_type | string | yes | - |
22-
| auth | string | no | - |
23-
| format | string | no | json |
24-
| common-options| | no | - |
16+
| name | type | required | default value |
17+
|----------------|--------|----------|---------------|
18+
| host | string | yes | - |
19+
| port | int | yes | - |
20+
| key | string | yes | - |
21+
| data_type | string | yes | - |
22+
| user | string | no | - |
23+
| auth | string | no | - |
24+
| mode | string | no | - |
25+
| auth | list | no | - |
26+
| format | string | no | json |
27+
| common-options | | no | - |
2528

2629
### host [string]
2730

@@ -75,11 +78,25 @@ Redis data types, support `key` `hash` `list` `set` `zset`
7578
- zset
7679
> Each data from upstream will be added to the configured zset key with a weight of 1. So the order of data in zset is based on the order of data consumption.
7780
78-
### auth [String]
81+
### user [string]
82+
83+
redis authentication user, you need it when you connect to an encrypted cluster
84+
85+
### auth [string]
7986

8087
Redis authentication password, you need it when you connect to an encrypted cluster
8188

82-
### format [String]
89+
### mode [string]
90+
91+
redis mode, `single` or `cluster`, default is `single`
92+
93+
### nodes [list]
94+
95+
redis nodes information, used in cluster mode, must like as the following format:
96+
97+
[host1:port1, host2:port2]
98+
99+
### format [string]
83100

84101
The format of upstream data, now only support `json`, `text` will be supported later, default `json`.
85102

@@ -121,3 +138,7 @@ simple:
121138
### 2.2.0-beta 2022-09-26
122139

123140
- Add Redis Sink Connector
141+
142+
### next version
143+
144+
- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/incubator-seatunnel/pull/3188)

docs/en/connector-v2/source/Redis.md

Lines changed: 100 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,20 @@ Used to read data from Redis.
1717

1818
## Options
1919

20-
| name | type | required | default value |
21-
|--------------- |--------|----------|---------------|
22-
| host | string | yes | - |
23-
| port | int | yes | - |
24-
| keys | string | yes | - |
25-
| data_type | string | yes | - |
26-
| auth | string | No | - |
27-
| schema | config | No | - |
28-
| format | string | No | json |
29-
| common-options | | no | - |
20+
| name | type | required | default value |
21+
|---------------------|--------|----------|---------------|
22+
| host | string | yes | - |
23+
| port | int | yes | - |
24+
| keys | string | yes | - |
25+
| data_type | string | yes | - |
26+
| user | string | no | - |
27+
| auth | string | no | - |
28+
| mode | string | no | - |
29+
| hash_key_parse_mode | string | no | all |
30+
| nodes | list | no | - |
31+
| schema | config | no | - |
32+
| format | string | no | json |
33+
| common-options | | no | - |
3034

3135
### host [string]
3236

@@ -36,6 +40,74 @@ redis host
3640

3741
redis port
3842

43+
### hash_key_parse_mode [string]
44+
45+
hash key parse mode, support `all` `kv`, used to tell connector how to parse hash key.
46+
47+
when setting it to `all`, connector will treat the value of hash key as a row and use the schema config to parse it, when setting it to `kv`, connector will treat each kv in hash key as a row and use the schema config to parse it:
48+
49+
for example, if the value of hash key is the following shown:
50+
51+
```text
52+
{
53+
"001": {
54+
"name": "tyrantlucifer",
55+
"age": 26
56+
},
57+
"002": {
58+
"name": "Zongwen",
59+
"age": 26
60+
}
61+
}
62+
63+
```
64+
65+
if hash_key_parse_mode is `all` and schema config as the following shown, it will generate the following data:
66+
67+
```hocon
68+
69+
schema {
70+
fields {
71+
001 {
72+
name = string
73+
age = int
74+
}
75+
002 {
76+
name = string
77+
age = int
78+
}
79+
}
80+
}
81+
82+
```
83+
84+
| 001 | 002 |
85+
|---------------------------------|---------------------------|
86+
| Row(name=tyrantlucifer, age=26) | Row(name=Zongwen, age=26) |
87+
88+
if hash_key_parse_mode is `kv` and schema config as the following shown, it will generate the following data:
89+
90+
```hocon
91+
92+
schema {
93+
fields {
94+
hash_key = string
95+
name = string
96+
age = int
97+
}
98+
}
99+
100+
```
101+
102+
| hash_key | name | age |
103+
|----------|---------------|-----|
104+
| 001 | tyrantlucifer | 26 |
105+
| 002 | Zongwen | 26 |
106+
107+
each kv that in hash key it will be treated as a row and send it to upstream.
108+
109+
**Tips: connector will use the first field information of schema config as the field name of each k that in each kv**
110+
39111
### keys [string]
40112

41113
keys pattern
@@ -67,10 +139,24 @@ redis data types, support `key` `hash` `list` `set` `zset`
67139
> Each element in the sorted set will be sent downstream as a single row of data
68140
> For example, the value of sorted set is `[tyrantlucier, CalvinKirs]`, the data received downstream are `tyrantlucifer` and `CalvinKirs` and only two message will be received.
69141
142+
### user [string]
143+
144+
redis authentication user, you need it when you connect to an encrypted cluster
145+
70146
### auth [string]
71147

72148
redis authentication password, you need it when you connect to an encrypted cluster
73149

150+
### mode [string]
151+
152+
redis mode, `single` or `cluster`, default is `single`
153+
154+
### nodes [list]
155+
156+
redis nodes information, used in cluster mode, must like as the following format:
157+
158+
[host1:port1, host2:port2]
159+
74160
### format [string]
75161

76162
the format of upstream data, now only support `json` `text`, default `json`.
@@ -166,3 +252,7 @@ simple:
166252
### 2.2.0-beta 2022-09-26
167253

168254
- Add Redis Source Connector
255+
256+
### next version
257+
258+
- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/incubator-seatunnel/pull/3188)
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redis.config;
19+
20+
import lombok.NonNull;
21+
import redis.clients.jedis.Jedis;
22+
import redis.clients.jedis.JedisCluster;
23+
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
28+
public class JedisWrapper extends Jedis {
29+
private final JedisCluster jedisCluster;
30+
31+
public JedisWrapper(@NonNull JedisCluster jedisCluster) {
32+
this.jedisCluster = jedisCluster;
33+
}
34+
35+
@Override
36+
public String set(final String key, final String value) {
37+
return jedisCluster.set(key, value);
38+
}
39+
40+
@Override
41+
public String get(final String key) {
42+
return jedisCluster.get(key);
43+
}
44+
45+
@Override
46+
public long hset(final String key, final Map<String, String> hash) {
47+
return jedisCluster.hset(key, hash);
48+
}
49+
50+
@Override
51+
public Map<String, String> hgetAll(final String key) {
52+
return jedisCluster.hgetAll(key);
53+
}
54+
55+
@Override
56+
public long lpush(final String key, final String... strings) {
57+
return jedisCluster.lpush(key, strings);
58+
}
59+
60+
@Override
61+
public List<String> lrange(final String key, final long start, final long stop) {
62+
return jedisCluster.lrange(key, start, stop);
63+
}
64+
65+
@Override
66+
public long sadd(final String key, final String... members) {
67+
return jedisCluster.sadd(key, members);
68+
}
69+
70+
@Override
71+
public Set<String> smembers(final String key) {
72+
return jedisCluster.smembers(key);
73+
}
74+
75+
@Override
76+
public long zadd(final String key, final double score, final String member) {
77+
return jedisCluster.zadd(key, score, member);
78+
}
79+
80+
@Override
81+
public List<String> zrange(final String key, final long start, final long stop) {
82+
return jedisCluster.zrange(key, start, stop);
83+
}
84+
85+
@Override
86+
public void close() {
87+
jedisCluster.close();
88+
}
89+
}

seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,22 @@ public class RedisConfig {
2121
public static final String HOST = "host";
2222
public static final String PORT = "port";
2323
public static final String AUTH = "auth";
24+
public static final String USER = "user";
2425
public static final String KEY_PATTERN = "keys";
2526
public static final String KEY = "key";
2627
public static final String DATA_TYPE = "data_type";
2728
public static final String FORMAT = "format";
29+
public static final String MODE = "mode";
30+
public static final String NODES = "nodes";
31+
public static final String HASH_KEY_PARSE_MODE = "hash_key_parse_mode";
32+
33+
public enum RedisMode {
34+
SINGLE,
35+
CLUSTER;
36+
}
37+
38+
public enum HashKeyParseMode {
39+
ALL,
40+
KV;
41+
}
2842
}

0 commit comments

Comments
 (0)