Skip to content

Commit

Permalink
[FLINK-22275][table] Support random past for timestamp types in datagen
Browse files Browse the repository at this point in the history
Introduce a `max-past` duration option for timestamp type fields. If the
`max-past` is specified, it will resolve a past timestamp relative to
the current timestamp of the local machine.

You can use it to achieve the out-of-order effect.
  • Loading branch information
yittg committed May 18, 2021
1 parent 06dec01 commit c6072f7
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 3 deletions.
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/table/datagen.md
Expand Up @@ -124,6 +124,13 @@ CREATE TABLE datagen (
<td>(Type of field)</td>
<td>随机生成器的最大值,适用于数字类型。</td>
</tr>
<tr>
<td><h5>fields.#.max-past</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;">0</td>
<td>Duration</td>
<td>随机生成器生成相对当前时间向过去偏移的最大值,适用于 Timestamp 类型.</td>
</tr>
<tr>
<td><h5>fields.#.length</h5></td>
<td>可选</td>
Expand Down
17 changes: 15 additions & 2 deletions docs/content/docs/connectors/table/datagen.md
Expand Up @@ -157,12 +157,18 @@ Types
<tr>
<td>TIMESTAMP</td>
<td>random</td>
<td>Always resolves to the current timestamp of the local machine.</td>
<td>
Resolves a past timestamp relative to the current timestamp of the local machine.
The max past is specified by the 'max-past' option.
</td>
</tr>
<tr>
<td>TIMESTAMP_LTZ</td>
<td>random</td>
<td>Always resolves to the current timestamp of the local machine.</td>
<td>
Resolves a past timestamp relative to the current timestamp of the local machine.
The max past is specified by the 'max-past' option.
</td>
</tr>
<tr>
<td>INTERVAL YEAR TO MONTH</td>
Expand Down Expand Up @@ -253,6 +259,13 @@ Connector Options
<td>(Type of field)</td>
<td>Maximum value of random generator, work for numeric types.</td>
</tr>
<tr>
<td><h5>fields.#.max-past</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">0</td>
<td>Duration</td>
<td>Maximum past of timestamp random generator, work for timestamp types.</td>
</tr>
<tr>
<td><h5>fields.#.length</h5></td>
<td>optional</td>
Expand Down
Expand Up @@ -36,6 +36,7 @@ public class DataGenOptions {
public static final String END = "end";
public static final String MIN = "min";
public static final String MAX = "max";
public static final String MAX_PAST = "max-past";
public static final String LENGTH = "length";

public static final String SEQUENCE = "sequence";
Expand Down Expand Up @@ -77,6 +78,14 @@ public class DataGenOptions {
.withDescription(
"Maximum value to generate for fields of kind 'random'. Maximum value possible for the type of the field.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<Integer> FIELD_MAX_PAST =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, MAX_PAST))
.intType()
.noDefaultValue()
.withDescription(
"Maximum past relative to the current timestamp of the local machine to generate for timestamp fields of kind 'random'.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<Integer> FIELD_LENGTH =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, LENGTH))
Expand Down
Expand Up @@ -67,6 +67,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(DataGenOptions.FIELD_KIND);
options.add(DataGenOptions.FIELD_MIN);
options.add(DataGenOptions.FIELD_MAX);
options.add(DataGenOptions.FIELD_MAX_PAST);
options.add(DataGenOptions.FIELD_LENGTH);
options.add(DataGenOptions.FIELD_START);
options.add(DataGenOptions.FIELD_END);
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.factories.DataGenOptions;
import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
import org.apache.flink.table.factories.datagen.types.DecimalDataRandomGenerator;
Expand All @@ -41,15 +42,19 @@
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
import org.apache.flink.table.types.logical.ZonedTimestampType;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -70,11 +75,14 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {

private final ConfigOptions.OptionBuilder maxKey;

private final ConfigOptions.OptionBuilder maxPastKey;

public RandomGeneratorVisitor(String name, ReadableConfig config) {
super(name, config);

this.minKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MIN);
this.maxKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MAX);
this.maxPastKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MAX_PAST);
}

@Override
Expand Down Expand Up @@ -184,6 +192,33 @@ public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
RandomGenerator.longGenerator(config.get(min), config.get(max)), min, max);
}

@Override
public DataGeneratorContainer visit(TimestampType timestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(ArrayType arrayType) {
ConfigOption<Integer> lenOption =
Expand Down Expand Up @@ -294,4 +329,16 @@ public StringData next() {
}
};
}

private static RandomGenerator<TimestampData> getRandomPastTimestampGenerator(
Duration maxPast) {
return new RandomGenerator<TimestampData>() {
@Override
public TimestampData next() {
long maxPastMillis = maxPast.toMillis();
long past = maxPastMillis > 0 ? random.nextLong(0, maxPastMillis) : 0;
return TimestampData.fromEpochMillis(System.currentTimeMillis() - past);
}
};
}
}
Expand Up @@ -54,7 +54,8 @@ public class DataGenTableSourceFactoryTest {
ResolvedSchema.of(
Column.physical("f0", DataTypes.STRING()),
Column.physical("f1", DataTypes.BIGINT()),
Column.physical("f2", DataTypes.BIGINT()));
Column.physical("f2", DataTypes.BIGINT()),
Column.physical("f3", DataTypes.TIMESTAMP()));

@Test
public void testDataTypeCoverage() throws Exception {
Expand Down Expand Up @@ -143,6 +144,10 @@ public void testSource() throws Exception {
descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.START, 50);
descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.END, 60);

descriptor.putString(
DataGenOptions.FIELDS + ".f3." + DataGenOptions.KIND, DataGenOptions.RANDOM);
descriptor.putString(DataGenOptions.FIELDS + ".f3." + DataGenOptions.MAX_PAST, "5s");

List<RowData> results = runGenerator(SCHEMA, descriptor);

Assert.assertEquals(11, results.size());
Expand Down

0 comments on commit c6072f7

Please sign in to comment.