Skip to content

Commit

Permalink
[FLINK-22275][table] Support random past for timestamp types in datagen
Browse files Browse the repository at this point in the history
Introduce a `max-past` duration option for timestamp type fields. If the
`max-past` is specified, it will resolve a past timestamp relative to
the current timestamp of the local machine.

You can use it to achieve the out-of-order effect.
  • Loading branch information
yittg committed Jun 8, 2021
1 parent 16452fb commit 93b58d4
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 3 deletions.
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/table/datagen.md
Expand Up @@ -124,6 +124,13 @@ CREATE TABLE datagen (
<td>(Type of field)</td>
<td>随机生成器的最大值,适用于数字类型。</td>
</tr>
<tr>
<td><h5>fields.#.max-past</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;">0</td>
<td>Duration</td>
<td>随机生成器生成相对当前时间向过去偏移的最大值,适用于 Timestamp 类型.</td>
</tr>
<tr>
<td><h5>fields.#.length</h5></td>
<td>可选</td>
Expand Down
17 changes: 15 additions & 2 deletions docs/content/docs/connectors/table/datagen.md
Expand Up @@ -157,12 +157,18 @@ Types
<tr>
<td>TIMESTAMP</td>
<td>random</td>
<td>Always resolves to the current timestamp of the local machine.</td>
<td>
Resolves a past timestamp relative to the current timestamp of the local machine.
The max past is specified by the 'max-past' option.
</td>
</tr>
<tr>
<td>TIMESTAMP_LTZ</td>
<td>random</td>
<td>Always resolves to the current timestamp of the local machine.</td>
<td>
Resolves a past timestamp relative to the current timestamp of the local machine.
The max past is specified by the 'max-past' option.
</td>
</tr>
<tr>
<td>INTERVAL YEAR TO MONTH</td>
Expand Down Expand Up @@ -253,6 +259,13 @@ Connector Options
<td>(Type of field)</td>
<td>Maximum value of random generator, work for numeric types.</td>
</tr>
<tr>
<td><h5>fields.#.max-past</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">0</td>
<td>Duration</td>
<td>Maximum past of timestamp random generator, work for timestamp types.</td>
</tr>
<tr>
<td><h5>fields.#.length</h5></td>
<td>optional</td>
Expand Down
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import java.time.Duration;

import static org.apache.flink.configuration.ConfigOptions.key;

/** {@link ConfigOption}s for {@link DataGenTableSourceFactory}. */
Expand All @@ -36,6 +38,7 @@ public class DataGenOptions {
public static final String END = "end";
public static final String MIN = "min";
public static final String MAX = "max";
public static final String MAX_PAST = "max-past";
public static final String LENGTH = "length";

public static final String SEQUENCE = "sequence";
Expand Down Expand Up @@ -77,6 +80,14 @@ public class DataGenOptions {
.withDescription(
"Maximum value to generate for fields of kind 'random'. Maximum value possible for the type of the field.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<Duration> FIELD_MAX_PAST =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, MAX_PAST))
.durationType()
.noDefaultValue()
.withDescription(
"Maximum past relative to the current timestamp of the local machine to generate for timestamp fields of kind 'random'.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<Integer> FIELD_LENGTH =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, LENGTH))
Expand Down
Expand Up @@ -67,6 +67,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(DataGenOptions.FIELD_KIND);
options.add(DataGenOptions.FIELD_MIN);
options.add(DataGenOptions.FIELD_MAX);
options.add(DataGenOptions.FIELD_MAX_PAST);
options.add(DataGenOptions.FIELD_LENGTH);
options.add(DataGenOptions.FIELD_START);
options.add(DataGenOptions.FIELD_END);
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.factories.DataGenOptions;
import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
import org.apache.flink.table.factories.datagen.types.DecimalDataRandomGenerator;
Expand All @@ -41,15 +42,19 @@
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
import org.apache.flink.table.types.logical.ZonedTimestampType;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -70,11 +75,14 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {

private final ConfigOptions.OptionBuilder maxKey;

private final ConfigOptions.OptionBuilder maxPastKey;

public RandomGeneratorVisitor(String name, ReadableConfig config) {
super(name, config);

this.minKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MIN);
this.maxKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MAX);
this.maxPastKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MAX_PAST);
}

@Override
Expand Down Expand Up @@ -184,6 +192,33 @@ public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
RandomGenerator.longGenerator(config.get(min), config.get(max)), min, max);
}

@Override
public DataGeneratorContainer visit(TimestampType timestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(ArrayType arrayType) {
ConfigOption<Integer> lenOption =
Expand Down Expand Up @@ -294,4 +329,16 @@ public StringData next() {
}
};
}

private static RandomGenerator<TimestampData> getRandomPastTimestampGenerator(
Duration maxPast) {
return new RandomGenerator<TimestampData>() {
@Override
public TimestampData next() {
long maxPastMillis = maxPast.toMillis();
long past = maxPastMillis > 0 ? random.nextLong(0, maxPastMillis) : 0;
return TimestampData.fromEpochMillis(System.currentTimeMillis() - past);
}
};
}
}
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.datagen.DataGenTableSource;
import org.apache.flink.util.InstantiationUtil;
Expand All @@ -54,7 +55,8 @@ public class DataGenTableSourceFactoryTest {
ResolvedSchema.of(
Column.physical("f0", DataTypes.STRING()),
Column.physical("f1", DataTypes.BIGINT()),
Column.physical("f2", DataTypes.BIGINT()));
Column.physical("f2", DataTypes.BIGINT()),
Column.physical("f3", DataTypes.TIMESTAMP()));

@Test
public void testDataTypeCoverage() throws Exception {
Expand Down Expand Up @@ -143,7 +145,13 @@ public void testSource() throws Exception {
descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.START, 50);
descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.END, 60);

descriptor.putString(
DataGenOptions.FIELDS + ".f3." + DataGenOptions.KIND, DataGenOptions.RANDOM);
descriptor.putString(DataGenOptions.FIELDS + ".f3." + DataGenOptions.MAX_PAST, "5s");

final long begin = System.currentTimeMillis();
List<RowData> results = runGenerator(SCHEMA, descriptor);
final long end = System.currentTimeMillis();

Assert.assertEquals(11, results.size());
for (int i = 0; i < results.size(); i++) {
Expand All @@ -152,6 +160,8 @@ public void testSource() throws Exception {
long f1 = row.getLong(1);
Assert.assertTrue(f1 >= 10 && f1 <= 100);
Assert.assertEquals(i + 50, row.getLong(2));
final TimestampData f3 = row.getTimestamp(3, 3);
Assert.assertTrue(f3.getMillisecond() >= begin - 5000 && f3.getMillisecond() <= end);
}
}

Expand Down

0 comments on commit 93b58d4

Please sign in to comment.