Skip to content

Commit a544752

Browse files
laglangyuetangjiafu
andauthored
[Imporve][Fake-Connector-V2]support user-defined-schmea and random data for fake-table (#2406)
* [Connector-V2][JDBC-connector] optimization fake Co-authored-by: tangjiafu <tangjiafu@corp.netease.com>
1 parent 093add2 commit a544752

File tree

34 files changed

+604
-111
lines changed

34 files changed

+604
-111
lines changed

docs/en/connector-v2/sink/Assert.md

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,11 @@ A list value rule define the data value validation
3737
### rule_type [string]
3838

3939
The following rules are supported for now
40-
`
41-
NOT_NULL, // value can't be null
42-
MIN, // define the minimum value of data
43-
MAX, // define the maximum value of data
44-
MIN_LENGTH, // define the minimum string length of a string data
45-
MAX_LENGTH // define the maximum string length of a string data
46-
`
40+
- NOT_NULL `value can't be null`
41+
- MIN `define the minimum value of data`
42+
- MAX `define the maximum value of data`
43+
- MIN_LENGTH `define the minimum string length of a string data`
44+
- MAX_LENGTH `define the maximum string length of a string data`
4745

4846
### rule_value [double]
4947

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# FakeSource
2+
3+
> FakeSource connector
4+
5+
## Description
6+
7+
The FakeSource is a virtual data source, which randomly generates the number of rows according to the data structure of the user-defined schema,
8+
just for testing, such as type conversion and feature testing
9+
10+
## Options
11+
12+
| name | type | required | default value |
13+
|-------------------|--------|----------|---------------|
14+
| result_table_name | string | yes | - |
15+
| schema | config | yes | - |
16+
17+
### result_table_name [string]
18+
19+
The table name.
20+
21+
### type [string]
22+
Table structure description ,you should assign schema option to tell connector how to parse data to the row you want.
23+
**Tips**: Most of Unstructured-Datasource contain this param, such as LocalFile,HdfsFile.
24+
**Example**:
25+
```hocon
26+
schema = {
27+
fields {
28+
c_map = "map<string, string>"
29+
c_array = "array<tinyint>"
30+
c_string = string
31+
c_boolean = boolean
32+
c_tinyint = tinyint
33+
c_smallint = smallint
34+
c_int = int
35+
c_bigint = bigint
36+
c_float = float
37+
c_double = double
38+
c_decimal = "decimal(30, 8)"
39+
c_null = "null"
40+
c_bytes = bytes
41+
c_date = date
42+
c_time = time
43+
c_timestamp = timestamp
44+
}
45+
}
46+
```
47+
48+
## Example
49+
Simple source for FakeSource which contains enough datatype
50+
```hocon
51+
source {
52+
FakeSource {
53+
schema = {
54+
fields {
55+
c_map = "map<string, string>"
56+
c_array = "array<tinyint>"
57+
c_string = string
58+
c_boolean = boolean
59+
c_tinyint = tinyint
60+
c_smallint = smallint
61+
c_int = int
62+
c_bigint = bigint
63+
c_float = float
64+
c_double = double
65+
c_decimal = "decimal(30, 8)"
66+
c_null = "null"
67+
c_bytes = bytes
68+
c_date = date
69+
c_time = time
70+
c_timestamp = timestamp
71+
}
72+
}
73+
result_table_name = "fake"
74+
}
75+
}
76+
```

seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@
3232
import org.apache.seatunnel.shade.com.typesafe.config.Config;
3333
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
3434

35+
import java.io.Serializable;
3536
import java.util.Map;
3637

37-
public class SeatunnelSchema {
38+
public class SeatunnelSchema implements Serializable {
3839
public static final String SCHEMA = "schema";
3940
private static final String FIELD_KEY = "fields";
4041
private static final String SIMPLE_SCHEMA_FILED = "content";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.fake.source;
19+
20+
import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
21+
import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
22+
import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
23+
import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
24+
import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
25+
import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
26+
import static org.apache.seatunnel.api.table.type.BasicType.SHORT_TYPE;
27+
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
28+
import static org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE;
29+
30+
import org.apache.seatunnel.api.table.type.ArrayType;
31+
import org.apache.seatunnel.api.table.type.BasicType;
32+
import org.apache.seatunnel.api.table.type.DecimalType;
33+
import org.apache.seatunnel.api.table.type.LocalTimeType;
34+
import org.apache.seatunnel.api.table.type.MapType;
35+
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
36+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
37+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
38+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
39+
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
40+
41+
import org.apache.commons.lang3.RandomStringUtils;
42+
import org.apache.commons.lang3.RandomUtils;
43+
44+
import java.lang.reflect.Array;
45+
import java.math.BigDecimal;
46+
import java.time.LocalDateTime;
47+
import java.util.ArrayList;
48+
import java.util.HashMap;
49+
import java.util.List;
50+
51+
public class FakeRandomData {
52+
public static final String SCHEMA = "schema";
53+
private final SeatunnelSchema schema;
54+
55+
public FakeRandomData(SeatunnelSchema schema) {
56+
this.schema = schema;
57+
}
58+
59+
public SeaTunnelRow randomRow() {
60+
SeaTunnelRowType seaTunnelRowType = schema.getSeaTunnelRowType();
61+
String[] fieldNames = seaTunnelRowType.getFieldNames();
62+
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
63+
List<Object> randomRow = new ArrayList<>(fieldNames.length);
64+
for (SeaTunnelDataType<?> fieldType : fieldTypes) {
65+
randomRow.add(randomColumnValue(fieldType));
66+
}
67+
return new SeaTunnelRow(randomRow.toArray());
68+
}
69+
70+
@SuppressWarnings("magicnumber")
71+
private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
72+
if (BOOLEAN_TYPE.equals(fieldType)) {
73+
return RandomUtils.nextInt(0, 2) == 1;
74+
} else if (BYTE_TYPE.equals(fieldType)) {
75+
return (byte) RandomUtils.nextInt(0, 255);
76+
} else if (SHORT_TYPE.equals(fieldType)) {
77+
return (short) RandomUtils.nextInt(Byte.MAX_VALUE, Short.MAX_VALUE);
78+
} else if (INT_TYPE.equals(fieldType)) {
79+
return RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE);
80+
} else if (LONG_TYPE.equals(fieldType)) {
81+
return RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE);
82+
} else if (FLOAT_TYPE.equals(fieldType)) {
83+
return RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE);
84+
} else if (DOUBLE_TYPE.equals(fieldType)) {
85+
return RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE);
86+
} else if (STRING_TYPE.equals(fieldType)) {
87+
return RandomStringUtils.randomAlphabetic(10);
88+
} else if (LocalTimeType.LOCAL_DATE_TYPE.equals(fieldType)) {
89+
return randomLocalDateTime().toLocalDate();
90+
} else if (LocalTimeType.LOCAL_TIME_TYPE.equals(fieldType)) {
91+
return randomLocalDateTime().toLocalTime();
92+
} else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(fieldType)) {
93+
return randomLocalDateTime();
94+
} else if (fieldType instanceof DecimalType) {
95+
DecimalType decimalType = (DecimalType) fieldType;
96+
return new BigDecimal(RandomStringUtils.randomNumeric(decimalType.getPrecision() - decimalType.getScale()) + "." +
97+
RandomStringUtils.randomNumeric(decimalType.getScale()));
98+
} else if (fieldType instanceof ArrayType) {
99+
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
100+
BasicType<?> elementType = arrayType.getElementType();
101+
Object value = randomColumnValue(elementType);
102+
Object arr = Array.newInstance(elementType.getTypeClass(), 1);
103+
Array.set(arr, 0, value);
104+
return arr;
105+
} else if (fieldType instanceof MapType) {
106+
MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
107+
SeaTunnelDataType<?> keyType = mapType.getKeyType();
108+
Object key = randomColumnValue(keyType);
109+
SeaTunnelDataType<?> valueType = mapType.getValueType();
110+
Object value = randomColumnValue(valueType);
111+
HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
112+
objectObjectHashMap.put(key, value);
113+
return objectObjectHashMap;
114+
} else if (fieldType instanceof PrimitiveByteArrayType) {
115+
return RandomUtils.nextBytes(100);
116+
} else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
117+
return Void.TYPE;
118+
} else {
119+
throw new UnsupportedOperationException("Unexpected value: " + fieldType);
120+
}
121+
}
122+
123+
@SuppressWarnings("magicnumber")
124+
private LocalDateTime randomLocalDateTime() {
125+
return LocalDateTime.of(
126+
LocalDateTime.now().getYear(),
127+
RandomUtils.nextInt(1, 12),
128+
RandomUtils.nextInt(1, LocalDateTime.now().getDayOfMonth()),
129+
RandomUtils.nextInt(0, 24),
130+
RandomUtils.nextInt(0, 59)
131+
);
132+
}
133+
}

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@
2020
import org.apache.seatunnel.api.common.SeaTunnelContext;
2121
import org.apache.seatunnel.api.source.Boundedness;
2222
import org.apache.seatunnel.api.source.SeaTunnelSource;
23-
import org.apache.seatunnel.api.table.type.BasicType;
24-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2523
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2624
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2725
import org.apache.seatunnel.common.constants.JobMode;
26+
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
2827
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
2928
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
3029
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
@@ -38,6 +37,7 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
3837

3938
private Config pluginConfig;
4039
private SeaTunnelContext seaTunnelContext;
40+
private SeatunnelSchema schema;
4141

4242
@Override
4343
public Boundedness getBoundedness() {
@@ -46,14 +46,12 @@ public Boundedness getBoundedness() {
4646

4747
@Override
4848
public SeaTunnelRowType getProducedType() {
49-
return new SeaTunnelRowType(
50-
new String[]{"name", "age", "timestamp"},
51-
new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE, BasicType.INT_TYPE, BasicType.LONG_TYPE});
49+
return schema.getSeaTunnelRowType();
5250
}
5351

5452
@Override
5553
public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
56-
return new FakeSourceReader(readerContext);
54+
return new FakeSourceReader(readerContext, new FakeRandomData(schema));
5755
}
5856

5957
@Override
@@ -64,6 +62,8 @@ public String getPluginName() {
6462
@Override
6563
public void prepare(Config pluginConfig) {
6664
this.pluginConfig = pluginConfig;
65+
assert pluginConfig.hasPath(FakeRandomData.SCHEMA);
66+
this.schema = SeatunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeRandomData.SCHEMA));
6767
}
6868

6969
@Override

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,17 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29-
import java.util.Random;
30-
import java.util.concurrent.ThreadLocalRandom;
31-
3229
public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
3330

3431
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);
3532

3633
private final SingleSplitReaderContext context;
3734

38-
private final String[] names = {"Wenjun", "Fanjia", "Zongwen", "CalvinKirs"};
39-
private final int[] ages = {11, 22, 33, 44};
35+
private final FakeRandomData fakeRandomData;
4036

41-
public FakeSourceReader(SingleSplitReaderContext context) {
37+
public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData randomData) {
4238
this.context = context;
39+
this.fakeRandomData = randomData;
4340
}
4441

4542
@Override
@@ -56,11 +53,8 @@ public void close() {
5653
@SuppressWarnings("magicnumber")
5754
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
5855
// Generate a random number of rows to emit.
59-
Random random = ThreadLocalRandom.current();
60-
int size = random.nextInt(10) + 1;
61-
for (int i = 0; i < size; i++) {
62-
int randomIndex = random.nextInt(names.length);
63-
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[randomIndex], ages[randomIndex], System.currentTimeMillis()});
56+
for (int i = 0; i < 10; i++) {
57+
SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
6458
output.collect(seaTunnelRow);
6559
}
6660
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {

0 commit comments

Comments
 (0)