diff --git a/docs/content/docs/connectors/table/datagen.md b/docs/content/docs/connectors/table/datagen.md
index f62b68b80469f9..5fb36bf56eae94 100644
--- a/docs/content/docs/connectors/table/datagen.md
+++ b/docs/content/docs/connectors/table/datagen.md
@@ -157,12 +157,18 @@ Types
TIMESTAMP |
random |
- Always resolves to the current timestamp of the local machine. |
+
+ Resolves a past timestamp relative to the current timestamp of the local machine.
+ The max past is specified by the 'max-past' option.
+ |
TIMESTAMP_LTZ |
random |
- Always resolves to the current timestamp of the local machine. |
+
+ Resolves a past timestamp relative to the current timestamp of the local machine.
+ The max past is specified by the 'max-past' option.
+ |
INTERVAL YEAR TO MONTH |
@@ -253,6 +259,13 @@ Connector Options
(Type of field) |
Maximum value of random generator, work for numeric types. |
+
+ fields.#.max-past |
+ optional |
+ 0 |
+ Duration |
+ Maximum past of timestamp random generator. |
+
fields.#.length |
optional |
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index 4cce51d1f63d68..8f23c90523b863 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -67,6 +67,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
public static final String END = "end";
public static final String MIN = "min";
public static final String MAX = "max";
+ public static final String MAX_PAST = "max-past";
public static final String LENGTH = "length";
public static final String SEQUENCE = "sequence";
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
index 88bfc7e15e4c02..b862aa7ed816fb 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
@@ -28,6 +28,7 @@
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
import org.apache.flink.table.factories.datagen.types.DecimalDataRandomGenerator;
import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
@@ -40,15 +41,19 @@
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,6 +63,7 @@
import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX_PAST;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
/** Creates a random {@link DataGeneratorContainer} for a particular logical type. */
@@ -73,11 +79,14 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
private final ConfigOptions.OptionBuilder maxKey;
+ private final ConfigOptions.OptionBuilder maxPastKey;
+
public RandomGeneratorVisitor(String name, ReadableConfig config) {
super(name, config);
this.minKey = key(FIELDS + "." + name + "." + MIN);
this.maxKey = key(FIELDS + "." + name + "." + MAX);
+ this.maxPastKey = key(FIELDS + "." + name + "." + MAX_PAST);
}
@Override
@@ -185,6 +194,33 @@ public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
RandomGenerator.longGenerator(config.get(min), config.get(max)), min, max);
}
+ @Override
+ public DataGeneratorContainer visit(TimestampType timestampType) {
+ ConfigOption maxPastOption =
+ maxPastKey.durationType().defaultValue(Duration.ZERO);
+
+ return DataGeneratorContainer.of(
+ getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
+ }
+
+ @Override
+ public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
+ ConfigOption maxPastOption =
+ maxPastKey.durationType().defaultValue(Duration.ZERO);
+
+ return DataGeneratorContainer.of(
+ getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
+ }
+
+ @Override
+ public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
+ ConfigOption maxPastOption =
+ maxPastKey.durationType().defaultValue(Duration.ZERO);
+
+ return DataGeneratorContainer.of(
+ getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
+ }
+
@Override
public DataGeneratorContainer visit(ArrayType arrayType) {
ConfigOption lenOption =
@@ -295,4 +331,16 @@ public StringData next() {
}
};
}
+
+ private static RandomGenerator getRandomPastTimestampGenerator(
+ Duration maxPast) {
+ return new RandomGenerator() {
+ @Override
+ public TimestampData next() {
+ long maxPastMillis = maxPast.toMillis();
+ long past = maxPastMillis > 0 ? random.nextLong(0, maxPastMillis) : 0;
+ return TimestampData.fromEpochMillis(System.currentTimeMillis() - past);
+ }
+ };
+ }
}
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index 0a65bd917a1826..8423fbcb8b0dbc 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -49,6 +49,7 @@
import static org.apache.flink.table.factories.DataGenTableSourceFactory.KIND;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX_PAST;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.NUMBER_OF_ROWS;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.RANDOM;
@@ -65,7 +66,8 @@ public class DataGenTableSourceFactoryTest {
ResolvedSchema.of(
Column.physical("f0", DataTypes.STRING()),
Column.physical("f1", DataTypes.BIGINT()),
- Column.physical("f2", DataTypes.BIGINT()));
+ Column.physical("f2", DataTypes.BIGINT()),
+ Column.physical("f3", DataTypes.TIMESTAMP()));
@Test
public void testDataTypeCoverage() throws Exception {
@@ -135,6 +137,9 @@ public void testSource() throws Exception {
descriptor.putLong(FIELDS + ".f2." + START, 50);
descriptor.putLong(FIELDS + ".f2." + END, 60);
+ descriptor.putString(FIELDS + ".f3." + KIND, RANDOM);
+ descriptor.putString(FIELDS + ".f3." + MAX_PAST, "5s");
+
List results = runGenerator(SCHEMA, descriptor);
Assert.assertEquals(11, results.size());