diff --git a/docs/content.zh/docs/connectors/table/datagen.md b/docs/content.zh/docs/connectors/table/datagen.md
index 287686faa74a7..7337df5e6baf6 100644
--- a/docs/content.zh/docs/connectors/table/datagen.md
+++ b/docs/content.zh/docs/connectors/table/datagen.md
@@ -124,6 +124,13 @@ CREATE TABLE datagen (
(Type of field) |
随机生成器的最大值,适用于数字类型。 |
+
+ fields.#.max-past |
+ 可选 |
+ 0 |
+ Duration |
+ 随机生成器生成相对当前时间向过去偏移的最大值,适用于 Timestamp 类型. |
+
fields.#.length |
可选 |
diff --git a/docs/content/docs/connectors/table/datagen.md b/docs/content/docs/connectors/table/datagen.md
index f62b68b80469f..50b89bff8c244 100644
--- a/docs/content/docs/connectors/table/datagen.md
+++ b/docs/content/docs/connectors/table/datagen.md
@@ -157,12 +157,18 @@ Types
TIMESTAMP |
random |
- Always resolves to the current timestamp of the local machine. |
+
+ Resolves a past timestamp relative to the current timestamp of the local machine.
+ The max past is specified by the 'max-past' option.
+ |
TIMESTAMP_LTZ |
random |
- Always resolves to the current timestamp of the local machine. |
+
+ Resolves a past timestamp relative to the current timestamp of the local machine.
+ The max past is specified by the 'max-past' option.
+ |
INTERVAL YEAR TO MONTH |
@@ -253,6 +259,13 @@ Connector Options
(Type of field) |
Maximum value of random generator, work for numeric types. |
+
+ fields.#.max-past |
+ optional |
+ 0 |
+ Duration |
+ Maximum past of timestamp random generator, work for timestamp types. |
+
fields.#.length |
optional |
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java
index e445359a3502a..8ee8dd0cee088 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java
@@ -22,6 +22,8 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import java.time.Duration;
+
import static org.apache.flink.configuration.ConfigOptions.key;
/** {@link ConfigOption}s for {@link DataGenTableSourceFactory}. */
@@ -36,6 +38,7 @@ public class DataGenOptions {
public static final String END = "end";
public static final String MIN = "min";
public static final String MAX = "max";
+ public static final String MAX_PAST = "max-past";
public static final String LENGTH = "length";
public static final String SEQUENCE = "sequence";
@@ -77,6 +80,14 @@ public class DataGenOptions {
.withDescription(
"Maximum value to generate for fields of kind 'random'. Maximum value possible for the type of the field.");
+ /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+ public static final ConfigOption FIELD_MAX_PAST =
+ ConfigOptions.key(String.format("%s.#.%s", FIELDS, MAX_PAST))
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "Maximum past relative to the current timestamp of the local machine to generate for timestamp fields of kind 'random'.");
+
/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption FIELD_LENGTH =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, LENGTH))
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index 622f511eaf74a..5efcd4a2c1566 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -67,6 +67,7 @@ public Set> optionalOptions() {
options.add(DataGenOptions.FIELD_KIND);
options.add(DataGenOptions.FIELD_MIN);
options.add(DataGenOptions.FIELD_MAX);
+ options.add(DataGenOptions.FIELD_MAX_PAST);
options.add(DataGenOptions.FIELD_LENGTH);
options.add(DataGenOptions.FIELD_START);
options.add(DataGenOptions.FIELD_END);
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
index 13c93af61aca6..1731c25c77398 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
@@ -28,6 +28,7 @@
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.factories.DataGenOptions;
import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
import org.apache.flink.table.factories.datagen.types.DecimalDataRandomGenerator;
@@ -41,15 +42,19 @@
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -70,11 +75,14 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
private final ConfigOptions.OptionBuilder maxKey;
+ private final ConfigOptions.OptionBuilder maxPastKey;
+
public RandomGeneratorVisitor(String name, ReadableConfig config) {
super(name, config);
this.minKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MIN);
this.maxKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MAX);
+ this.maxPastKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MAX_PAST);
}
@Override
@@ -184,6 +192,33 @@ public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
RandomGenerator.longGenerator(config.get(min), config.get(max)), min, max);
}
+ @Override
+ public DataGeneratorContainer visit(TimestampType timestampType) {
+ ConfigOption maxPastOption =
+ maxPastKey.durationType().defaultValue(Duration.ZERO);
+
+ return DataGeneratorContainer.of(
+ getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
+ }
+
+ @Override
+ public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
+ ConfigOption maxPastOption =
+ maxPastKey.durationType().defaultValue(Duration.ZERO);
+
+ return DataGeneratorContainer.of(
+ getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
+ }
+
+ @Override
+ public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
+ ConfigOption maxPastOption =
+ maxPastKey.durationType().defaultValue(Duration.ZERO);
+
+ return DataGeneratorContainer.of(
+ getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
+ }
+
@Override
public DataGeneratorContainer visit(ArrayType arrayType) {
ConfigOption lenOption =
@@ -294,4 +329,16 @@ public StringData next() {
}
};
}
+
+ private static RandomGenerator getRandomPastTimestampGenerator(
+ Duration maxPast) {
+ return new RandomGenerator() {
+ @Override
+ public TimestampData next() {
+ long maxPastMillis = maxPast.toMillis();
+ long past = maxPastMillis > 0 ? random.nextLong(0, maxPastMillis) : 0;
+ return TimestampData.fromEpochMillis(System.currentTimeMillis() - past);
+ }
+ };
+ }
}
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index 5943a9ec23c00..f345e6badf275 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -31,6 +31,7 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.datagen.DataGenTableSource;
import org.apache.flink.util.InstantiationUtil;
@@ -54,7 +55,8 @@ public class DataGenTableSourceFactoryTest {
ResolvedSchema.of(
Column.physical("f0", DataTypes.STRING()),
Column.physical("f1", DataTypes.BIGINT()),
- Column.physical("f2", DataTypes.BIGINT()));
+ Column.physical("f2", DataTypes.BIGINT()),
+ Column.physical("f3", DataTypes.TIMESTAMP()));
@Test
public void testDataTypeCoverage() throws Exception {
@@ -143,7 +145,13 @@ public void testSource() throws Exception {
descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.START, 50);
descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.END, 60);
+ descriptor.putString(
+ DataGenOptions.FIELDS + ".f3." + DataGenOptions.KIND, DataGenOptions.RANDOM);
+ descriptor.putString(DataGenOptions.FIELDS + ".f3." + DataGenOptions.MAX_PAST, "5s");
+
+ final long begin = System.currentTimeMillis();
List results = runGenerator(SCHEMA, descriptor);
+ final long end = System.currentTimeMillis();
Assert.assertEquals(11, results.size());
for (int i = 0; i < results.size(); i++) {
@@ -152,6 +160,8 @@ public void testSource() throws Exception {
long f1 = row.getLong(1);
Assert.assertTrue(f1 >= 10 && f1 <= 100);
Assert.assertEquals(i + 50, row.getLong(2));
+ final TimestampData f3 = row.getTimestamp(3, 3);
+ Assert.assertTrue(f3.getMillisecond() >= begin - 5000 && f3.getMillisecond() <= end);
}
}