Skip to content

Commit 044f62e

Browse files
[Improve][Connector-V2] Improve fake source connector (#2944)
* [Improve][Connector-V2] Improve fake source connector * [Improve][Connector-V2] Update doc * [Improve][Connector-V2] Fix e2e test cases * [Improve][Connector-V2] Fix integration error. * Revert "[Improve][Connector-V2] Fix integration error." This reverts commit c840c69. * [Improve][Connector-V2] Fix flink e2e test cases * [Improve][Connector-V2] Optimize class name
1 parent ed7a75c commit 044f62e

File tree

12 files changed

+412
-287
lines changed

12 files changed

+412
-287
lines changed

docs/en/connector-v2/source/FakeSource.md

Lines changed: 80 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
## Description
66

77
The FakeSource is a virtual data source, which randomly generates the number of rows according to the data structure of the user-defined schema,
8-
just for testing, such as type conversion and feature testing
8+
just for some test cases such as type conversion or connector new feature testing
99

1010
## Key features
1111

@@ -18,30 +18,43 @@ just for testing, such as type conversion and feature testing
1818

1919
## Options
2020

21-
| name | type | required | default value |
22-
|-------------------|--------|----------|---------------|
23-
| result_table_name | string | yes | - |
24-
| schema | config | yes | - |
25-
| row.num | long | no | 10 |
21+
| name | type | required | default value |
22+
|---------------|--------|----------|---------------|
23+
| schema | config | yes | - |
24+
| row.num | int | no | 5 |
25+
| map.size | int | no | 5 |
26+
| array.size | int | no | 5 |
27+
| bytes.length | int | no | 5 |
28+
| string.length | int | no | 5 |
2629

27-
### result_table_name [string]
2830

29-
The table name.
31+
### schema [config]
3032

31-
### type [string]
33+
The schema of fake data that you want to generate
3234

33-
Table structure description ,you should assign schema option to tell connector how to parse data to the row you want.
34-
**Tips**: Most of Unstructured-Datasource contain this param, such as LocalFile,HdfsFile.
35-
**Example**:
36-
37-
### row.num
38-
Number of additional rows of generated data
35+
For example:
3936

4037
```hocon
41-
schema = {
42-
fields {
43-
c_map = "map<string, string>"
44-
c_array = "array<tinyint>"
38+
schema = {
39+
fields {
40+
c_map = "map<string, array<int>>"
41+
c_array = "array<int>"
42+
c_string = string
43+
c_boolean = boolean
44+
c_tinyint = tinyint
45+
c_smallint = smallint
46+
c_int = int
47+
c_bigint = bigint
48+
c_float = float
49+
c_double = double
50+
c_decimal = "decimal(30, 8)"
51+
c_null = "null"
52+
c_bytes = bytes
53+
c_date = date
54+
c_timestamp = timestamp
55+
c_row = {
56+
c_map = "map<string, map<string, string>>"
57+
c_array = "array<int>"
4558
c_string = string
4659
c_boolean = boolean
4760
c_tinyint = tinyint
@@ -54,23 +67,61 @@ schema = {
5467
c_null = "null"
5568
c_bytes = bytes
5669
c_date = date
57-
c_time = time
5870
c_timestamp = timestamp
5971
}
6072
}
73+
}
6174
```
6275

63-
## Example
76+
### row.num
77+
78+
Total num of data that connector generated
79+
80+
### map.size
81+
82+
The size of `map` type that connector generated
6483

65-
Simple source for FakeSource which contains enough datatype
84+
### array.size
85+
86+
The size of `array` type that connector generated
87+
88+
### bytes.length
89+
90+
The length of `bytes` type that connector generated
91+
92+
### string.length
93+
94+
The length of `string` type that connector generated
95+
96+
## Example
6697

6798
```hocon
68-
source {
69-
FakeSource {
70-
schema = {
71-
fields {
72-
c_map = "map<string, string>"
73-
c_array = "array<tinyint>"
99+
FakeSource {
100+
row.num = 10
101+
map.size = 10
102+
array.size = 10
103+
bytes.length = 10
104+
string.length = 10
105+
schema = {
106+
fields {
107+
c_map = "map<string, array<int>>"
108+
c_array = "array<int>"
109+
c_string = string
110+
c_boolean = boolean
111+
c_tinyint = tinyint
112+
c_smallint = smallint
113+
c_int = int
114+
c_bigint = bigint
115+
c_float = float
116+
c_double = double
117+
c_decimal = "decimal(30, 8)"
118+
c_null = "null"
119+
c_bytes = bytes
120+
c_date = date
121+
c_timestamp = timestamp
122+
c_row = {
123+
c_map = "map<string, map<string, string>>"
124+
c_array = "array<int>"
74125
c_string = string
75126
c_boolean = boolean
76127
c_tinyint = tinyint
@@ -83,11 +134,9 @@ source {
83134
c_null = "null"
84135
c_bytes = bytes
85136
c_date = date
86-
c_time = time
87137
c_timestamp = timestamp
88138
}
89139
}
90-
result_table_name = "fake"
91140
}
92141
}
93-
```
142+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.fake.source;
19+
20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
22+
import lombok.Builder;
23+
import lombok.Getter;
24+
25+
import java.io.Serializable;
26+
27+
@Builder
28+
@Getter
29+
public class FakeConfig implements Serializable {
30+
private static final String ROW_NUM = "row.num";
31+
private static final String MAP_SIZE = "map.size";
32+
private static final String ARRAY_SIZE = "array.size";
33+
private static final String BYTES_LENGTH = "bytes.length";
34+
private static final String STRING_LENGTH = "string.length";
35+
private static final int DEFAULT_ROW_NUM = 5;
36+
private static final int DEFAULT_MAP_SIZE = 5;
37+
private static final int DEFAULT_ARRAY_SIZE = 5;
38+
private static final int DEFAULT_BYTES_LENGTH = 5;
39+
private static final int DEFAULT_STRING_LENGTH = 5;
40+
@Builder.Default
41+
private int rowNum = DEFAULT_ROW_NUM;
42+
@Builder.Default
43+
private int mapSize = DEFAULT_MAP_SIZE;
44+
@Builder.Default
45+
private int arraySize = DEFAULT_ARRAY_SIZE;
46+
@Builder.Default
47+
private int bytesLength = DEFAULT_BYTES_LENGTH;
48+
@Builder.Default
49+
private int stringLength = DEFAULT_STRING_LENGTH;
50+
51+
public static FakeConfig buildWithConfig(Config config) {
52+
FakeConfigBuilder builder = FakeConfig.builder();
53+
if (config.hasPath(ROW_NUM)) {
54+
builder.rowNum(config.getInt(ROW_NUM));
55+
}
56+
if (config.hasPath(MAP_SIZE)) {
57+
builder.mapSize(config.getInt(MAP_SIZE));
58+
}
59+
if (config.hasPath(ARRAY_SIZE)) {
60+
builder.arraySize(config.getInt(ARRAY_SIZE));
61+
}
62+
if (config.hasPath(BYTES_LENGTH)) {
63+
builder.bytesLength(config.getInt(BYTES_LENGTH));
64+
}
65+
if (config.hasPath(STRING_LENGTH)) {
66+
builder.stringLength(config.getInt(STRING_LENGTH));
67+
}
68+
return builder.build();
69+
}
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.fake.source;
19+
20+
import org.apache.seatunnel.api.table.type.ArrayType;
21+
import org.apache.seatunnel.api.table.type.BasicType;
22+
import org.apache.seatunnel.api.table.type.DecimalType;
23+
import org.apache.seatunnel.api.table.type.MapType;
24+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
25+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
26+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
27+
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
28+
29+
import org.apache.commons.lang3.RandomStringUtils;
30+
import org.apache.commons.lang3.RandomUtils;
31+
32+
import java.lang.reflect.Array;
33+
import java.math.BigDecimal;
34+
import java.time.LocalDateTime;
35+
import java.util.ArrayList;
36+
import java.util.HashMap;
37+
import java.util.List;
38+
39+
public class FakeDataGenerator {
40+
public static final String SCHEMA = "schema";
41+
private final SeaTunnelSchema schema;
42+
private final FakeConfig fakeConfig;
43+
44+
public FakeDataGenerator(SeaTunnelSchema schema, FakeConfig fakeConfig) {
45+
this.schema = schema;
46+
this.fakeConfig = fakeConfig;
47+
}
48+
49+
private SeaTunnelRow randomRow() {
50+
SeaTunnelRowType seaTunnelRowType = schema.getSeaTunnelRowType();
51+
String[] fieldNames = seaTunnelRowType.getFieldNames();
52+
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
53+
List<Object> randomRow = new ArrayList<>(fieldNames.length);
54+
for (SeaTunnelDataType<?> fieldType : fieldTypes) {
55+
randomRow.add(randomColumnValue(fieldType));
56+
}
57+
return new SeaTunnelRow(randomRow.toArray());
58+
}
59+
60+
public List<SeaTunnelRow> generateFakedRows() {
61+
ArrayList<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
62+
for (int i = 0; i < fakeConfig.getRowNum(); i++) {
63+
seaTunnelRows.add(randomRow());
64+
}
65+
return seaTunnelRows;
66+
}
67+
68+
@SuppressWarnings("magicnumber")
69+
private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
70+
switch (fieldType.getSqlType()) {
71+
case ARRAY:
72+
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
73+
BasicType<?> elementType = arrayType.getElementType();
74+
int length = fakeConfig.getArraySize();
75+
Object array = Array.newInstance(elementType.getTypeClass(), length);
76+
for (int i = 0; i < length; i++) {
77+
Object value = randomColumnValue(elementType);
78+
Array.set(array, i, value);
79+
}
80+
return array;
81+
case MAP:
82+
MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
83+
SeaTunnelDataType<?> keyType = mapType.getKeyType();
84+
SeaTunnelDataType<?> valueType = mapType.getValueType();
85+
HashMap<Object, Object> objectMap = new HashMap<>();
86+
int mapSize = fakeConfig.getMapSize();
87+
for (int i = 0; i < mapSize; i++) {
88+
Object key = randomColumnValue(keyType);
89+
Object value = randomColumnValue(valueType);
90+
objectMap.put(key, value);
91+
}
92+
return objectMap;
93+
case STRING:
94+
return RandomStringUtils.randomAlphabetic(fakeConfig.getStringLength());
95+
case BOOLEAN:
96+
return RandomUtils.nextInt(0, 2) == 1;
97+
case TINYINT:
98+
return (byte) RandomUtils.nextInt(0, 255);
99+
case SMALLINT:
100+
return (short) RandomUtils.nextInt(Byte.MAX_VALUE, Short.MAX_VALUE);
101+
case INT:
102+
return RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE);
103+
case BIGINT:
104+
return RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE);
105+
case FLOAT:
106+
return RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE);
107+
case DOUBLE:
108+
return RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE);
109+
case DECIMAL:
110+
DecimalType decimalType = (DecimalType) fieldType;
111+
return new BigDecimal(RandomStringUtils.randomNumeric(decimalType.getPrecision() - decimalType.getScale()) + "." +
112+
RandomStringUtils.randomNumeric(decimalType.getScale()));
113+
case NULL:
114+
return null;
115+
case BYTES:
116+
return RandomUtils.nextBytes(fakeConfig.getBytesLength());
117+
case DATE:
118+
return randomLocalDateTime().toLocalDate();
119+
case TIME:
120+
return randomLocalDateTime().toLocalTime();
121+
case TIMESTAMP:
122+
return randomLocalDateTime();
123+
case ROW:
124+
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) fieldType).getFieldTypes();
125+
Object[] objects = new Object[fieldTypes.length];
126+
for (int i = 0; i < fieldTypes.length; i++) {
127+
Object object = randomColumnValue(fieldTypes[i]);
128+
objects[i] = object;
129+
}
130+
return new SeaTunnelRow(objects);
131+
default:
132+
// never got in there
133+
throw new UnsupportedOperationException("SeaTunnel Fake source connector not support this data type");
134+
}
135+
}
136+
137+
@SuppressWarnings("magicnumber")
138+
private LocalDateTime randomLocalDateTime() {
139+
return LocalDateTime.of(
140+
LocalDateTime.now().getYear(),
141+
RandomUtils.nextInt(1, 12),
142+
RandomUtils.nextInt(1, 28),
143+
RandomUtils.nextInt(0, 24),
144+
RandomUtils.nextInt(0, 59)
145+
);
146+
}
147+
}

0 commit comments

Comments
 (0)