Skip to content

Conversation

@sjwiesman
Copy link
Contributor

What is the purpose of the change

Updates the DataGen source to support most types. This enables using DataGen in conjunction with Flink's LIKE clause as it currently does not support overwriting physical columns with computed columns.

Brief change log

I refactored the table factory and added support for all types except RAW and STRUCTURED. See commit messages for a full list of changes. I also added support for creating bounded tables which is useful when prototyping a query.

Verifying this change

New unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 60b2d39 (Tue Jul 28 17:12:39 UTC 2020)

Warnings:

  • Documentation files were touched, but no .zh.md files: Update Chinese documentation or file Jira ticket.
  • Invalid pull request title: No valid Jira ID provided

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@sjwiesman sjwiesman changed the title Datagen [FLINK-18735][table] Add support for more types to DataGen source Jul 28, 2020
@flinkbot
Copy link
Collaborator

flinkbot commented Jul 28, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sjwiesman , sorry for late response.
Now data generator source can be a bounded source, you can implement a ITCase for it, like BlackHoleConnectorITCase.

if (isRunning && generator.hasNext() && (numberOfRows == null || outputSoFar < toOutput)) {
synchronized (ctx.getCheckpointLock()) {
if (numberOfRows != null) {
outputSoFar++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an idea: print outputSoFar in close. It is good for debugging.

if (isRunning && generator.hasNext()) {
if (isRunning && generator.hasNext() && (numberOfRows == null || outputSoFar < toOutput)) {
synchronized (ctx.getCheckpointLock()) {
if (numberOfRows != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this if?

Set<ConfigOption<?>> options = keyContainer.getOptions();
options.addAll(valContainer.getOptions());

return DataGeneratorContainer.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look to the comments of RowData. The data strutures should be:

 * +--------------------------------+-----------------------------------------+
 * | Row                            | {@link RowData}                         |
 * +--------------------------------+-----------------------------------------+
 * | ARRAY                          | {@link ArrayData}                       |
 * +--------------------------------+-----------------------------------------+
 * | MAP / MULTISET                 | {@link MapData}                         |
 * +--------------------------------+-----------------------------------------+


- Random generator is the default generator, you can specify random max and min values. For char/varchar/string, the length can be specified. It is a unbounded generator.
- Sequence generator, you can specify sequence start and end values. It is a bounded generator, when the sequence number reaches the end value, the reading ends.
Time types are always the local machines current system time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can have a table to show all types.
Display the generation strategies they support, and the required parameters?

@sjwiesman
Copy link
Contributor Author

@JingsongLi thanks for taking a look. I've updated my PR accordingly.

@sjwiesman
Copy link
Contributor Author

@JingsongLi what do you think of the status of this PR?

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very sorry for the late response. Left some comments. Feel free to ping me...

throw new ValidationException("Unsupported type: " + logicalType);
}

private interface SerializableSupplier<T> extends Supplier<T>, Serializable { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think lambda is already Serializable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not, the source gets caught in the closure cleaner without this.

Copy link
Contributor

@JingsongLi JingsongLi Sep 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant we could do something like return (Supplier<T> & Serializable) () -> {...}, but it is OK for now.

public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond) {
public DataGeneratorSource(DataGenerator<T> generator, String name, long rowsPerSecond, Long numberOfRows) {
this.generator = generator;
this.name = name;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a remnant from an old commit, will remove.

return mapper.apply(generator.next());
}

public interface SerializableFunction<A, B> extends Function<A, B>, Serializable {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, we need this.


private static final long serialVersionUID = 1L;

private final DataGenerator[] fieldGenerators;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add ? to DataGenerator like DataGenerator<?>[]? At least, The compiler will have no warning.


import org.junit.Test;

public class DataGeneratorConnectorITCase extends StreamingTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use BatchTestBase.


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
boolean isBounded = numberOfRows == null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the opposite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

public void testTypes() {
tEnv().executeSql(TABLE);
tEnv().executeSql(SINK);
tEnv().from("datagen_t").executeInsert("sink");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use: Lists.newArrayList(tEnv().executeSql("select * from datagen_t").collect()).
In this way, the types can be verified.

@Override
public DataGeneratorContainer visit(DecimalType decimalType) {
return DataGeneratorContainer.of(
SequenceGenerator.bigDecimalGenerator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the structures of Decimal, Times are wrong, you can take a look to the comments of RowData.

@sjwiesman
Copy link
Contributor Author

@JingsongLi thank you for the thorough. Review. I've corrected the types and updated the test to validate them. If you don't have any other comments I'll merge on green.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for update, Looks good to me except time type generator, and left some minor comments.

private final long rowsPerSecond;

@Nullable
private Long numberOfRows;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

@Nullable
private Long numberOfRows;

private int outputSoFar;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transient


private int outputSoFar;

private int toOutput;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transient


@Override
public DataGeneratorContainer visit(TimeType timeType) {
return DataGeneratorContainer.of(TimeGenerator.of(() -> LocalTime.now().get(MILLI_OF_SECOND)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be MILLI_OF_DAY?

};
}

private static class BigDecimalRandomGenerator implements DataGenerator<DecimalData> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be named DecimalDataRandomGenerator
BigDecimal makes confused

@sjwiesman
Copy link
Contributor Author

Sorry for the long back and forth, just pushed a fix addressing your comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants