Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12028][table] Add addColumns,renameColumns, dropColumns #8057

Closed
wants to merge 2 commits into from

Conversation

sunjincheng121
Copy link
Member

What is the purpose of the change

In this PR will add column operators as follows:

  • Add columns
  • Replace columns
  • Drop columns
  • Rename columns

See google doc

Brief change log

  • Add addColumns,renameColumns and dropColumns interfaces in Table.
  • Add the implementation of addColumns, renameColumns and dropColumns in TableImpl.
  • Add docs for addColumns,renameColumns and dropColumns.

Verifying this change

This change added tests and can be verified as follows:

  • Added integration tests addColumns, renameColumns and dropColumns
  • Added validation tests addColumns, renameColumns and dropColumns
  • Added plan check tests addColumns, renameColumns and dropColumns

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 27, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@sunjincheng121
Copy link
Member Author

@flinkbot attention @dawidwys FYI.

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunjincheng121 Thanks a lot for the work. It overall LGTM. Just left a few comments.


/**
* Performs a field add operation. Similar to an SQL SELECT statement. The field expressions
* can contain complex expressions, but can not be an aggregations. Will report an expression if
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can not be an aggregations -> can not contain?
will report an expression -> It will throw an exception

/**
* Performs a field add operation. Similar to an SQL SELECT statement. The field expressions
* can contain complex expressions, but can not be an aggregations. Will report an expression if
* the added field name already exists.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field name already exists -> fields already exist

@@ -879,4 +879,117 @@
* @return An OverWindowedTable to specify the aggregations.
*/
OverWindowedTable window(OverWindow... overWindows);

/**
* Performs a field add operation. Similar to an SQL SELECT statement. The field expressions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statement. -> statement, ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinkstatement. is fine, is there any other reason?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that there are causal relationship between these two statements. They are dependent.


/**
* Performs a field rename operation. Similar to an field alias statement. The field expressions
* should be an alias expressions, and only the existing fields can be renamed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"an" should be removed


/**
* Performs a field drop operation. The field expressions
* should be an field reference expressions, and only the existing fields can be drop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"an" should be removed
only the existing fields can be drop -> only existing fields can be dropped

fields.map(expressionBridge.bridge).foreach {
case e@Alias(child: UnresolvedFieldReference, _, _) =>
val index = finalFields.indexWhere(p => p match {
case u: UnresolvedFieldReference => u.name.equalsIgnoreCase(child.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equalsIgnoreCase -> equals

dropFields.foreach {
case UnresolvedFieldReference(name) =>
val index = finalFields.indexWhere(p => p match {
case u: UnresolvedFieldReference => u.name.equalsIgnoreCase(name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equalsIgnoreCase -> equals


// Test for fields that do not exist
try {
tab.renameColumns('e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a rename test such as the following to make sure that only existing fields can be renamed:
tab.renameColumns('a + 1 as 'e)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think line 97 has test the field which not exist case. but i should add the alias. what do you think?

case _ => false
})
if (index >= 0) {
finalFields(index) = e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we allow added fields have the same field name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think is better to overwrite the field which has the same name. what do you think?

if (index >= 0) {
finalFields.remove(index)
} else {
throw new TableException(s"Drop field [${name}] does not exist in source table.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow dropped fields contain the same field name multiple times? such as dropColumns("a, a")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a bad usage, this query should throw an exception? what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Currently it will report exception "Drop field [a] does not exist in source table." for the above case. May be we should throw a more meaningful error message?

Copy link
Contributor

@hequn8128 hequn8128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunjincheng121 Thanks for the PR. It looks good from my side. I only left some suggestions about comments problems.

Best, Hequn


/**
* Performs a field add operation. Similar to an SQL SELECT statement. The field expressions
* can contain complex expressions, but can not be an aggregations. Will report an expression if
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • can not be an aggregations => can not contain aggregations
  • report an expression => report an exception


/**
* Performs a field add operation. Similar to an SQL SELECT statement. The field expressions
* can contain complex expressions, but can not be an aggregations. Will report an expression if
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • an aggregations. ditto.
  • expression => exception


/**
* Performs a field add operation. Similar to an SQL SELECT statement. The field expressions
* can contain complex expressions, but can not be an aggregations. Existing fields will be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • an aggregations. ditto.


/**
* Performs a field add operation. Similar to an SQL SELECT statement. The field expressions
* can contain complex expressions, but can not be an aggregations. Existing fields will be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • an aggregations. ditto.


/**
* Performs a field rename operation. Similar to an field alias statement. The field expressions
* should be an alias expressions, and only the existing fields can be renamed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be an alias expressions => Each of the field expressions should be an alias expression. ?


/**
* Performs a field drop operation. The field expressions
* should be an field reference expressions, and only the existing fields can be drop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field expressions should be an field reference expressions => Each of the field expressions should be a filed reference expression.


/**
* Performs a field drop operation. The field expressions
* should be an field reference expressions, and only the existing fields can be drop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

* </pre>
*/
Table dropColumns(Expression... fields);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the empty line?


if(aggNames.nonEmpty){
throw new TableException(
s"The added field expression cannot be an aggregations, find [${aggNames.head}].")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an aggregations => an aggregation


val finalFields = childFields.toBuffer

// Remove the fields which should be delete in the final list
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be deleted

@sunjincheng121
Copy link
Member Author

Thanks for your review @dianfu @hequn8128
And I find that both of you review the PR at the same time, some of the comments are the same! HAHA :-)
I have addressed your comments, and update the PR!
I appreciate if you can have look at again. :)

Best,
Jincheng

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunjincheng121 Thanks a lot for the update. Just a few minor comments. +1 after updated.

<p>Performs a field rename operation.</p>
{% highlight java %}
Table orders = tableEnv.scan("Orders");
Table result = orders.dropColumns("b as b2, c as c2");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropColumns -> renameColumns


/**
* Performs a field drop operation. The field expressions
* should be an field reference expressions, and only existing fields can be dropped.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove "an"


/**
* Performs a field drop operation. The field expressions
* should be an field reference expressions, and only existing fields can be dropped.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove "an"

@dianfu
Copy link
Contributor

dianfu commented Mar 28, 2019

@flinkbot approve all

1 similar comment
@hequn8128
Copy link
Contributor

@flinkbot approve all

@yanghua
Copy link
Contributor

yanghua commented Mar 28, 2019

@sunjincheng121 There are some conflicts need to be resolved.

@dawidwys
Copy link
Contributor

@flinkbot approve-until architecture

@sunjincheng121
Copy link
Member Author

Thanks for the reminder to rebase the code. @yanghua
Thanks for pay attention to this PR @dawidwys I have aligned the refactoring design of FLINK_11884. I appreciate if you can have look at the changes.

@sunjincheng121
Copy link
Member Author

Thanks for all of you @dianfu @hequn8128 @dawidwys!
Merging...

sunjincheng121 added a commit to sunjincheng121/flink that referenced this pull request Mar 29, 2019
Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @sunjincheng121 thank you very much for the PR. I think it will be a valuable addition that users will appreciate.

I think though we should improve tests structure a bit. Also I am not sure if we need the renameColumns method.


<tr>
<td>
<strong>AddColumns</strong><br>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain a bit more what happens in corner cases?
What happens:

  • when adding column with a name that already exists
  • when adding columns with same name
  • when you add aggregation as a new column
    Can you think of some other special cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding those case:
When: replaceIfExist = false(default)

  1. It will throw an exception if the added fields already exist.
  2. It will throw an exception if adding columns with the same name
  3. It will throw an exception when you add aggregation as a new column
    When: replaceIfExist = true
  4. It will throw an exception if replace the existing column but do not with the alias.
  5. If the added fields have duplicate field name, then the last one is valid.

<p>Performs a field rename operation.</p>
{% highlight java %}
Table orders = tableEnv.scan("Orders");
Table result = orders.renameColumns("b as b2, c as c2");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it the same as add with replace = true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That good point, I had thought this case, I think we should throw the exception if the renamed name already exists. In common case, the user wants to rename a meaningful name for their business. but not want replace exist one, if user want replace exist one, the will drop the exist by dropColumns, what do you think?

<p>Performs a field drop operation.</p>
{% highlight java %}
Table orders = tableEnv.scan("Orders");
Table result = orders.dropColumns("b, c");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you extend with what sort of expressions are supported? What happens if column does not exist?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think dropColumns only supports FieldRefrence or column operations, such as dropColunns(columns(...)) when FLINK-12029 is done. And It will throw an exception If the column does not exist. The columns that the user wants to delete are all known column names. If the user accidentally writes the wrong column name but does not report an error, this will be a bit confusing, so I chose strict verification if the column to be deleted does not exist, throw an exception directly. Is that make sense to you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant, that it would be nice to have that description in docs.

* }
* </pre>
*/
Table addColumns(boolean replaceIfExist, Expression... fields);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to addOrReplaceColumns(Expression... fields)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's really difficult to choose,addOrReplaceColumns == addColumns(true, fields) they are not equal. and I also think about this name, but I think in some platform want using addColumns(variable(true or false), fields). replaceIfExist is a variable. What do you think?

@@ -879,4 +879,118 @@
* @return An OverWindowedTable to specify the aggregations.
*/
OverWindowedTable window(OverWindow... overWindows);

/**
* Performs a field add operation. Similar to an SQL SELECT statement. The field expressions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Performs a field add operation. Similar to an SQL SELECT statement. The field expressions
* Performs a field add operation. Similar to a SQL SELECT statement. The field expressions

val util = batchTestUtil()
val sourceTable = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val resultTable = sourceTable
.addColumns("concat(c, 'sunny') as kid")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we split that complex test into multiple tests that describe what is tested? Those tests do not spawn cluster, so they are cheap, we can have them as many as we like.

What I mean by splitting is having tests like testSimpleAdd, testReplaceIfExist etc. Right now it costs the reader a lot of effort to understand what is being checked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. +1

val util = batchTestUtil()
val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)

var t1 = t.addColumns(true, concat('c, "Sunny") as 'kid).addColumns('b + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have the same tests for both string and expression approach? Best if we could unify them. Maybe sth similar to: org.apache.flink.table.expressions.utils.ExpressionTestBase#testAllApis?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ExpressionTestBase#testAllApis is better for a function test, and operator tests, such as select, filter etc. using CalcStringExpressionTest is fine, from my side. What do you think?

val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)

// Test aggregates
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate those test cases. Use expected=Exception annotation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that some of the test case using expected=Exception and some of the test case using try/cach, so if test multi-case I like using try/catch, but I am fine if you want to change it into expected=Exception. Do you also prefer using expected=Exception?

@@ -138,6 +138,57 @@ class CalcTest extends TableTestBase {

util.verifyTable(resultTable, expected)
}

@Test
def testAddColumns(): Unit = {
Copy link
Contributor

@dawidwys dawidwys Mar 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between all those cases? Do we just parametrize them with (isBatch, String/Expression)? We should think of unifying it. It does not make sense to copy it to all places.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, the only difference is batch or stream. I think this is a common issue, not only the column test case. can we unify this in the other PR? if you want we can add all test in the stream, and remove all of batch test case. what do you think?

@@ -593,6 +593,77 @@ class CalcITCase(
val results = filterDs.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test
def testAddColumns(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to have those IT cases? After all it just produces a Projection, which is already covered by tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, it's better for each operator to add an ITCase. and I know it will cost more time, do you want to remove all of the ITCase, or reduce the column ITCase in one?.

@rmetzger rmetzger requested a review from dawidwys March 29, 2019 09:49
@sunjincheng121 sunjincheng121 force-pushed the FLINK-12028-PR branch 2 times, most recently from 92b2a25 to 7dcb9ed Compare April 1, 2019 04:41
@sunjincheng121
Copy link
Member Author

Hi, @dawidwys thanks for your valuable comments. I have updated the PR with the following changes:

  • remove all batch test case
  • AddColumns(replaceIfExist, ..) -> AddOrReplaceColumns(...)
  • reduce ITcase in one method
  • Some doc improvement.

Thanks again for your review and offline call. I appreciate if you can have a look at this PR again, and welcome any feedback!

Best,
Jincheng

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those changes look good to me. The only thing I would improve is the docs. I think we can have similar descriptions there as in the javadocs of the corresponding methods.

<p>Performs a field drop operation.</p>
{% highlight java %}
Table orders = tableEnv.scan("Orders");
Table result = orders.dropColumns("b, c");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant, that it would be nice to have that description in docs.

/**
* Adds additional columns. Similar to a SQL SELECT statement. The field expressions
* can contain complex expressions, but can not contain aggregations. Existing fields will be
* replaced. If the added fields have duplicate field name, then the last one is used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* replaced. If the added fields have duplicate field name, then the last one is used.
* replaced. If the added fields have duplicated field name, then the last one is used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we should do this change? duplicate -> duplicated? I using translate.google.cn, It suggests using duplicate. Can you double check this?

Table addOrReplaceColumns(Expression... fields);

/**
* Renames existing columns. Similar to an field alias statement. The field expressions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Renames existing columns. Similar to an field alias statement. The field expressions
* Renames existing columns. Similar to a field alias statement. The field expressions

@sunjincheng121
Copy link
Member Author

sunjincheng121 commented Apr 1, 2019

Great thanks for your quick review @dawidwys!
Will fixed your comments before merging the PR.

Best,
Jincheng

@asfgit asfgit closed this in b32cd41 Apr 1, 2019
HuangZhenQiu pushed a commit to HuangZhenQiu/flink that referenced this pull request Apr 22, 2019
sunhaibotb pushed a commit to sunhaibotb/flink that referenced this pull request May 8, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants