Skip to content

Commit

Permalink
[FLINK-22275][table] Support random past for timestamp types in datag…
Browse files Browse the repository at this point in the history
…en connector (#15703)
  • Loading branch information
yittg committed Sep 14, 2021
1 parent d92540c commit f74db4b
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 25 deletions.
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/table/datagen.md
Expand Up @@ -124,6 +124,13 @@ CREATE TABLE datagen (
<td>(Type of field)</td>
<td>随机生成器的最大值,适用于数字类型。</td>
</tr>
<tr>
<td><h5>fields.#.max-past</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;">0</td>
<td>Duration</td>
<td>随机生成器生成相对当前时间向过去偏移的最大值,适用于 timestamp 类型。</td>
</tr>
<tr>
<td><h5>fields.#.length</h5></td>
<td>可选</td>
Expand Down
17 changes: 15 additions & 2 deletions docs/content/docs/connectors/table/datagen.md
Expand Up @@ -157,12 +157,18 @@ Types
<tr>
<td>TIMESTAMP</td>
<td>random</td>
<td>Always resolves to the current timestamp of the local machine.</td>
<td>
Resolves a past timestamp relative to the current timestamp of the local machine.
The max past can be specified by the 'max-past' option.
</td>
</tr>
<tr>
<td>TIMESTAMP_LTZ</td>
<td>random</td>
<td>Always resolves to the current timestamp of the local machine.</td>
<td>
Resolves a past timestamp relative to the current timestamp of the local machine.
The max past can be specified by the 'max-past' option.
</td>
</tr>
<tr>
<td>INTERVAL YEAR TO MONTH</td>
Expand Down Expand Up @@ -253,6 +259,13 @@ Connector Options
<td>(Type of field)</td>
<td>Maximum value of random generator, work for numeric types.</td>
</tr>
<tr>
<td><h5>fields.#.max-past</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">0</td>
<td>Duration</td>
<td>Maximum past of timestamp random generator, only works for timestamp types.</td>
</tr>
<tr>
<td><h5>fields.#.length</h5></td>
<td>optional</td>
Expand Down
Expand Up @@ -350,6 +350,7 @@ fields.amount.min
fields.product.kind
fields.product.length
fields.ts.kind
fields.ts.max-past
fields.user.kind
fields.user.max
fields.user.min
Expand Down
Expand Up @@ -22,12 +22,15 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import java.time.Duration;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.END;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.FIELDS;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.KIND;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.LENGTH;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.MAX;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.MAX_PAST;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.MIN;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.ROWS_PER_SECOND_DEFAULT_VALUE;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.START;
Expand Down Expand Up @@ -76,6 +79,14 @@ public class DataGenConnectorOptions {
.withDescription(
"Maximum value to generate for fields of kind 'random'. Maximum value possible for the type of the field.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<Duration> FIELD_MAX_PAST =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, MAX_PAST))
.durationType()
.noDefaultValue()
.withDescription(
"Maximum past relative to the current timestamp of the local machine to generate for timestamp fields of kind 'random'.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<Integer> FIELD_LENGTH =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, LENGTH))
Expand Down
Expand Up @@ -32,6 +32,7 @@ public class DataGenConnectorOptionsUtil {
public static final String END = "end";
public static final String MIN = "min";
public static final String MAX = "max";
public static final String MAX_PAST = "max-past";
public static final String LENGTH = "length";

public static final String SEQUENCE = "sequence";
Expand Down
Expand Up @@ -65,6 +65,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(DataGenConnectorOptions.FIELD_KIND);
options.add(DataGenConnectorOptions.FIELD_MIN);
options.add(DataGenConnectorOptions.FIELD_MAX);
options.add(DataGenConnectorOptions.FIELD_MAX_PAST);
options.add(DataGenConnectorOptions.FIELD_LENGTH);
options.add(DataGenConnectorOptions.FIELD_START);
options.add(DataGenConnectorOptions.FIELD_END);
Expand Down
Expand Up @@ -24,13 +24,9 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.ZonedTimestampType;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;

import java.io.Serializable;
Expand Down Expand Up @@ -66,24 +62,6 @@ public DataGeneratorContainer visit(TimeType timeType) {
return DataGeneratorContainer.of(TimeGenerator.of(() -> LocalTime.now().get(MILLI_OF_DAY)));
}

@Override
public DataGeneratorContainer visit(TimestampType timestampType) {
return DataGeneratorContainer.of(
TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
}

@Override
public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
return DataGeneratorContainer.of(
TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
}

@Override
public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
return DataGeneratorContainer.of(
TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
}

@Override
protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
throw new ValidationException("Unsupported type: " + logicalType);
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
Expand All @@ -40,15 +41,19 @@
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
import org.apache.flink.table.types.logical.ZonedTimestampType;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -69,6 +74,8 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {

private final ConfigOptions.OptionBuilder maxKey;

private final ConfigOptions.OptionBuilder maxPastKey;

public RandomGeneratorVisitor(String name, ReadableConfig config) {
super(name, config);

Expand All @@ -86,6 +93,13 @@ public RandomGeneratorVisitor(String name, ReadableConfig config) {
+ name
+ "."
+ DataGenConnectorOptionsUtil.MAX);
this.maxPastKey =
key(
DataGenConnectorOptionsUtil.FIELDS
+ "."
+ name
+ "."
+ DataGenConnectorOptionsUtil.MAX_PAST);
}

@Override
Expand Down Expand Up @@ -203,6 +217,33 @@ public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
RandomGenerator.longGenerator(config.get(min), config.get(max)), min, max);
}

@Override
public DataGeneratorContainer visit(TimestampType timestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(ArrayType arrayType) {
ConfigOption<Integer> lenOption =
Expand Down Expand Up @@ -325,4 +366,16 @@ public StringData next() {
}
};
}

private static RandomGenerator<TimestampData> getRandomPastTimestampGenerator(
Duration maxPast) {
return new RandomGenerator<TimestampData>() {
@Override
public TimestampData next() {
long maxPastMillis = maxPast.toMillis();
long past = maxPastMillis > 0 ? random.nextLong(0, maxPastMillis) : 0;
return TimestampData.fromEpochMillis(System.currentTimeMillis() - past);
}
};
}
}
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.util.InstantiationUtil;

Expand All @@ -57,7 +58,8 @@ public class DataGenTableSourceFactoryTest {
ResolvedSchema.of(
Column.physical("f0", DataTypes.STRING()),
Column.physical("f1", DataTypes.BIGINT()),
Column.physical("f2", DataTypes.BIGINT()));
Column.physical("f2", DataTypes.BIGINT()),
Column.physical("f3", DataTypes.TIMESTAMP()));

@Test
public void testDataTypeCoverage() throws Exception {
Expand Down Expand Up @@ -156,7 +158,16 @@ public void testSource() throws Exception {
descriptor.putLong(
DataGenConnectorOptionsUtil.FIELDS + ".f2." + DataGenConnectorOptionsUtil.END, 60);

descriptor.putString(
DataGenConnectorOptionsUtil.FIELDS + ".f3." + DataGenConnectorOptionsUtil.KIND,
DataGenConnectorOptionsUtil.RANDOM);
descriptor.putString(
DataGenConnectorOptionsUtil.FIELDS + ".f3." + DataGenConnectorOptionsUtil.MAX_PAST,
"5s");

final long begin = System.currentTimeMillis();
List<RowData> results = runGenerator(SCHEMA, descriptor);
final long end = System.currentTimeMillis();

Assert.assertEquals(11, results.size());
for (int i = 0; i < results.size(); i++) {
Expand All @@ -165,6 +176,8 @@ public void testSource() throws Exception {
long f1 = row.getLong(1);
Assert.assertTrue(f1 >= 10 && f1 <= 100);
Assert.assertEquals(i + 50, row.getLong(2));
final TimestampData f3 = row.getTimestamp(3, 3);
Assert.assertTrue(f3.getMillisecond() >= begin - 5000 && f3.getMillisecond() <= end);
}
}

Expand Down

0 comments on commit f74db4b

Please sign in to comment.