diff --git a/docs/content.zh/docs/connectors/table/datagen.md b/docs/content.zh/docs/connectors/table/datagen.md
index 287686faa74a7..96dff1c11d99c 100644
--- a/docs/content.zh/docs/connectors/table/datagen.md
+++ b/docs/content.zh/docs/connectors/table/datagen.md
@@ -124,6 +124,13 @@ CREATE TABLE datagen (
(Type of field) |
随机生成器的最大值,适用于数字类型。 |
+
+ fields.#.max-past |
+ 可选 |
+ 0 |
+ Duration |
+ 随机生成器生成相对当前时间向过去偏移的最大值,适用于 timestamp 类型。 |
+
fields.#.length |
可选 |
diff --git a/docs/content/docs/connectors/table/datagen.md b/docs/content/docs/connectors/table/datagen.md
index aff8683a9d48c..d4f9432aa9f30 100644
--- a/docs/content/docs/connectors/table/datagen.md
+++ b/docs/content/docs/connectors/table/datagen.md
@@ -157,12 +157,18 @@ Types
TIMESTAMP |
random |
- Always resolves to the current timestamp of the local machine. |
+
+ Resolves a past timestamp relative to the current timestamp of the local machine.
+ The max past can be specified by the 'max-past' option.
+ |
TIMESTAMP_LTZ |
random |
- Always resolves to the current timestamp of the local machine. |
+
+ Resolves a past timestamp relative to the current timestamp of the local machine.
+ The max past can be specified by the 'max-past' option.
+ |
INTERVAL YEAR TO MONTH |
@@ -253,6 +259,13 @@ Connector Options
(Type of field) |
Maximum value of random generator, work for numeric types. |
+
+ fields.#.max-past |
+ optional |
+ 0 |
+ Duration |
+ Maximum past of timestamp random generator, only works for timestamp types. |
+
fields.#.length |
optional |
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 44f0f9628d201..fd840bbc79d28 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -350,6 +350,7 @@ fields.amount.min
fields.product.kind
fields.product.length
fields.ts.kind
+fields.ts.max-past
fields.user.kind
fields.user.max
fields.user.min
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java
index e795c928c22ad..991f2fa65e84f 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java
@@ -22,12 +22,15 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import java.time.Duration;
+
import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.END;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.FIELDS;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.KIND;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.LENGTH;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.MAX;
+import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.MAX_PAST;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.MIN;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.ROWS_PER_SECOND_DEFAULT_VALUE;
import static org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil.START;
@@ -76,6 +79,14 @@ public class DataGenConnectorOptions {
.withDescription(
"Maximum value to generate for fields of kind 'random'. Maximum value possible for the type of the field.");
+ /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+ public static final ConfigOption FIELD_MAX_PAST =
+ ConfigOptions.key(String.format("%s.#.%s", FIELDS, MAX_PAST))
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "Maximum past relative to the current timestamp of the local machine to generate for timestamp fields of kind 'random'.");
+
/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption FIELD_LENGTH =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, LENGTH))
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptionsUtil.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptionsUtil.java
index c7a1ba1d19768..db5ba1f7b2e50 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptionsUtil.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptionsUtil.java
@@ -32,6 +32,7 @@ public class DataGenConnectorOptionsUtil {
public static final String END = "end";
public static final String MIN = "min";
public static final String MAX = "max";
+ public static final String MAX_PAST = "max-past";
public static final String LENGTH = "length";
public static final String SEQUENCE = "sequence";
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
index d6c773a1b42cc..7873fb5018500 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
@@ -65,6 +65,7 @@ public Set> optionalOptions() {
options.add(DataGenConnectorOptions.FIELD_KIND);
options.add(DataGenConnectorOptions.FIELD_MIN);
options.add(DataGenConnectorOptions.FIELD_MAX);
+ options.add(DataGenConnectorOptions.FIELD_MAX_PAST);
options.add(DataGenConnectorOptions.FIELD_LENGTH);
options.add(DataGenConnectorOptions.FIELD_START);
options.add(DataGenConnectorOptions.FIELD_END);
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenVisitorBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenVisitorBase.java
index ba8c281808410..8fbc894e9966c 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenVisitorBase.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenVisitorBase.java
@@ -24,13 +24,9 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DateType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.ZonedTimestampType;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
import java.io.Serializable;
@@ -66,24 +62,6 @@ public DataGeneratorContainer visit(TimeType timeType) {
return DataGeneratorContainer.of(TimeGenerator.of(() -> LocalTime.now().get(MILLI_OF_DAY)));
}
- @Override
- public DataGeneratorContainer visit(TimestampType timestampType) {
- return DataGeneratorContainer.of(
- TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
- }
-
- @Override
- public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
- return DataGeneratorContainer.of(
- TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
- }
-
- @Override
- public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
- return DataGeneratorContainer.of(
- TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
- }
-
@Override
protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
throw new ValidationException("Unsupported type: " + logicalType);
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
index 0116e8bbd2dae..28f3782cd0e35 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
@@ -31,6 +31,7 @@
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
@@ -40,15 +41,19 @@
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -69,6 +74,8 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
private final ConfigOptions.OptionBuilder maxKey;
+ private final ConfigOptions.OptionBuilder maxPastKey;
+
public RandomGeneratorVisitor(String name, ReadableConfig config) {
super(name, config);
@@ -86,6 +93,13 @@ public RandomGeneratorVisitor(String name, ReadableConfig config) {
+ name
+ "."
+ DataGenConnectorOptionsUtil.MAX);
+ this.maxPastKey =
+ key(
+ DataGenConnectorOptionsUtil.FIELDS
+ + "."
+ + name
+ + "."
+ + DataGenConnectorOptionsUtil.MAX_PAST);
}
@Override
@@ -203,6 +217,33 @@ public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
RandomGenerator.longGenerator(config.get(min), config.get(max)), min, max);
}
+ @Override
+ public DataGeneratorContainer visit(TimestampType timestampType) {
+ ConfigOption maxPastOption =
+ maxPastKey.durationType().defaultValue(Duration.ZERO);
+
+ return DataGeneratorContainer.of(
+ getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
+ }
+
+ @Override
+ public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
+ ConfigOption maxPastOption =
+ maxPastKey.durationType().defaultValue(Duration.ZERO);
+
+ return DataGeneratorContainer.of(
+ getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
+ }
+
+ @Override
+ public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
+ ConfigOption maxPastOption =
+ maxPastKey.durationType().defaultValue(Duration.ZERO);
+
+ return DataGeneratorContainer.of(
+ getRandomPastTimestampGenerator(config.get(maxPastOption)), maxPastOption);
+ }
+
@Override
public DataGeneratorContainer visit(ArrayType arrayType) {
ConfigOption lenOption =
@@ -325,4 +366,16 @@ public StringData next() {
}
};
}
+
+ private static RandomGenerator getRandomPastTimestampGenerator(
+ Duration maxPast) {
+ return new RandomGenerator() {
+ @Override
+ public TimestampData next() {
+ long maxPastMillis = maxPast.toMillis();
+ long past = maxPastMillis > 0 ? random.nextLong(0, maxPastMillis) : 0;
+ return TimestampData.fromEpochMillis(System.currentTimeMillis() - past);
+ }
+ };
+ }
}
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index a8b3e4c8c5eb9..40cfa0dce6f49 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -35,6 +35,7 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.util.InstantiationUtil;
@@ -57,7 +58,8 @@ public class DataGenTableSourceFactoryTest {
ResolvedSchema.of(
Column.physical("f0", DataTypes.STRING()),
Column.physical("f1", DataTypes.BIGINT()),
- Column.physical("f2", DataTypes.BIGINT()));
+ Column.physical("f2", DataTypes.BIGINT()),
+ Column.physical("f3", DataTypes.TIMESTAMP()));
@Test
public void testDataTypeCoverage() throws Exception {
@@ -156,7 +158,16 @@ public void testSource() throws Exception {
descriptor.putLong(
DataGenConnectorOptionsUtil.FIELDS + ".f2." + DataGenConnectorOptionsUtil.END, 60);
+ descriptor.putString(
+ DataGenConnectorOptionsUtil.FIELDS + ".f3." + DataGenConnectorOptionsUtil.KIND,
+ DataGenConnectorOptionsUtil.RANDOM);
+ descriptor.putString(
+ DataGenConnectorOptionsUtil.FIELDS + ".f3." + DataGenConnectorOptionsUtil.MAX_PAST,
+ "5s");
+
+ final long begin = System.currentTimeMillis();
List results = runGenerator(SCHEMA, descriptor);
+ final long end = System.currentTimeMillis();
Assert.assertEquals(11, results.size());
for (int i = 0; i < results.size(); i++) {
@@ -165,6 +176,8 @@ public void testSource() throws Exception {
long f1 = row.getLong(1);
Assert.assertTrue(f1 >= 10 && f1 <= 100);
Assert.assertEquals(i + 50, row.getLong(2));
+ final TimestampData f3 = row.getTimestamp(3, 3);
+ Assert.assertTrue(f3.getMillisecond() >= begin - 5000 && f3.getMillisecond() <= end);
}
}