Skip to content

Commit

Permalink
[FLINK-22275][table] Support random past for timestamp types in datagen
Browse files Browse the repository at this point in the history
Introduce a `max-past` duration option for timestamp type fields. If the
`max-past` is specified, it will resolve a past timestamp relative to
the current timestamp of the local machine.

You can use it to achieve the out-of-order effect.
  • Loading branch information
yittg committed Apr 21, 2021
1 parent 60e17f4 commit 19e9878
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 3 deletions.
17 changes: 15 additions & 2 deletions docs/content/docs/connectors/table/datagen.md
Expand Up @@ -157,12 +157,18 @@ Types
<tr>
<td>TIMESTAMP</td>
<td>random</td>
<td>Always resolves to the current timestamp of the local machine.</td>
<td>
Resolves a past timestamp relative to the current timestamp of the local machine.
The max past is specified by the 'max-past' option.
</td>
</tr>
<tr>
<td>TIMESTAMP_LTZ</td>
<td>random</td>
<td>Always resolves to the current timestamp of the local machine.</td>
<td>
Resolves a past timestamp relative to the current timestamp of the local machine.
The max past is specified by the 'max-past' option.
</td>
</tr>
<tr>
<td>INTERVAL YEAR TO MONTH</td>
Expand Down Expand Up @@ -253,6 +259,13 @@ Connector Options
<td>(Type of field)</td>
<td>Maximum value of random generator, work for numeric types.</td>
</tr>
<tr>
<td><h5>fields.#.max-past</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">0</td>
<td>Duration</td>
<td>Maximum past of timestamp random generator.</td>
</tr>
<tr>
<td><h5>fields.#.length</h5></td>
<td>optional</td>
Expand Down
Expand Up @@ -67,6 +67,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
public static final String END = "end";
public static final String MIN = "min";
public static final String MAX = "max";
public static final String MAX_PAST = "max-past";
public static final String LENGTH = "length";

public static final String SEQUENCE = "sequence";
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
import org.apache.flink.table.factories.datagen.types.DecimalDataRandomGenerator;
import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
Expand All @@ -40,15 +41,19 @@
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
import org.apache.flink.table.types.logical.ZonedTimestampType;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -58,6 +63,7 @@
import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX_PAST;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;

/** Creates a random {@link DataGeneratorContainer} for a particular logical type. */
Expand All @@ -73,11 +79,14 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {

private final ConfigOptions.OptionBuilder maxKey;

private final ConfigOptions.OptionBuilder maxPastKey;

public RandomGeneratorVisitor(String name, ReadableConfig config) {
super(name, config);

this.minKey = key(FIELDS + "." + name + "." + MIN);
this.maxKey = key(FIELDS + "." + name + "." + MAX);
this.maxPastKey = key(FIELDS + "." + name + "." + MAX_PAST);
}

@Override
Expand Down Expand Up @@ -185,6 +194,33 @@ public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
RandomGenerator.longGenerator(config.get(min), config.get(max)), min, max);
}

@Override
public DataGeneratorContainer visit(TimestampType timestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
ConfigOption<Duration> maxPastOption =
maxPastKey.durationType().defaultValue(Duration.ZERO);

return DataGeneratorContainer.of(
getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
}

@Override
public DataGeneratorContainer visit(ArrayType arrayType) {
ConfigOption<Integer> lenOption =
Expand Down Expand Up @@ -295,4 +331,16 @@ public StringData next() {
}
};
}

private static RandomGenerator<TimestampData> getRandomPastTimestampGenerator(
Duration maxPast) {
return new RandomGenerator<TimestampData>() {
@Override
public TimestampData next() {
long maxPastMillis = maxPast.toMillis();
long past = maxPastMillis > 0 ? random.nextLong(0, maxPastMillis) : 0;
return TimestampData.fromEpochMillis(System.currentTimeMillis() - past);
}
};
}
}
Expand Up @@ -49,6 +49,7 @@
import static org.apache.flink.table.factories.DataGenTableSourceFactory.KIND;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX_PAST;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.NUMBER_OF_ROWS;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.RANDOM;
Expand All @@ -65,7 +66,8 @@ public class DataGenTableSourceFactoryTest {
ResolvedSchema.of(
Column.physical("f0", DataTypes.STRING()),
Column.physical("f1", DataTypes.BIGINT()),
Column.physical("f2", DataTypes.BIGINT()));
Column.physical("f2", DataTypes.BIGINT()),
Column.physical("f3", DataTypes.TIMESTAMP()));

@Test
public void testDataTypeCoverage() throws Exception {
Expand Down Expand Up @@ -135,6 +137,9 @@ public void testSource() throws Exception {
descriptor.putLong(FIELDS + ".f2." + START, 50);
descriptor.putLong(FIELDS + ".f2." + END, 60);

descriptor.putString(FIELDS + ".f3." + KIND, RANDOM);
descriptor.putString(FIELDS + ".f3." + MAX_PAST, "5s");

List<RowData> results = runGenerator(SCHEMA, descriptor);

Assert.assertEquals(11, results.size());
Expand Down

0 comments on commit 19e9878

Please sign in to comment.