New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12029][table] Add column operations for TableApi #8087
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
docs/dev/table/tableApi.md
Outdated
</td> | ||
<td> | ||
{% highlight scala %} | ||
select(columns(2 ~ 4)) = select('b, 'c, 'd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please pay attention to the comments in google doc , Timo suggest tousing some real word instead ~
, Like to
or until
, +1 for to
from my side, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. @hequn8128
Before starting the detailed review, I suggest paying attention to the comments in the google doc, due to there is some detail discussing for this JIRA. (such as the comment I left above.) :)
Best, Jincheng
@@ -593,6 +593,27 @@ class CalcITCase( | |||
val results = filterDs.toDataSet[Row].collect() | |||
TestBaseUtils.compareResultAsText(results.asJava, expected) | |||
} | |||
|
|||
@Test | |||
def testColumnOperations(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove the ITCase, due to the column operation only the Table API level Features, do not touch runtime logic. What do you think?
} | ||
|
||
@Test | ||
def testOrderBy(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can only keep OrderBy test, due to other operator can be test by stream/table/ColumnsOperationsTest, What do you think?
docs/dev/table/tableApi.md
Outdated
{% highlight scala %} | ||
table | ||
.groupBy(columns(1 ~ 3)) | ||
.select(columns('a ~ 'b), myUDAgg(myUDF(columns(5 ~ 20)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the same example with JAVA?
@flinkbot approve-until |
@sunjincheng121 Thanks a lot for your review. I have updated the PR according to your suggestions. Best, Hequn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for quick update @hequn8128!
For quick interaction, I glad to send current comments to you, and I will have look at ColumnsOperationExpander
later. and feedback to you.
Best,
Jincheng
import _root_.scala.collection.JavaConverters._ | ||
import _root_.scala.collection.JavaConversions._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useless change?
new FunctionDefinition("columnsRange", OTHER_FUNCTION); | ||
public static final FunctionDefinition COLUMNS_INVERSE_SELECTION = | ||
new FunctionDefinition("columnsInverseSelection", OTHER_FUNCTION); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest:
COLUMNS
->COLUMNS_COLUMNS_SELECTION
(columns -> columnsSelection)COLUMNS_INVERSE_SELECTION
->COLUMNS_INVERSE
(columnsInverseSelection -> columnsInverse`
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think we can try to remove COLUMNS_INVERSE_SELECTION
and using MINUS_PREFIX
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using MINUS_PREFIX
directly is better. Thanks.
*/ | ||
def unary_- : Expression = call(MINUS_PREFIX, expr) | ||
def unary_- : Expression = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we using MINUS_PREFIX
directly. It's valuable to do this effort. Is that makes sense to you?
/** | ||
* Column operation which indicates the range from left to right inclusive. | ||
*/ | ||
def to (other: Expression): Expression = call(COLUMNS_RANGE, expr, other) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indicates the range from left to right, i.e. [left, right], which can be used in columns selection, e.g.: columns(1 to 3). In future to
can be using other cases. what do you think?
// columns | ||
public static final FunctionDefinition COLUMNS = | ||
new FunctionDefinition("columns", OTHER_FUNCTION); | ||
public static final FunctionDefinition COLUMNS_RANGE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the COLUMNS_RANGE
can be used in other cases? such as lambdas? If so we may named COMMON_RANGE
, but I am fine with COLUMNS_RANGE
for the current PR. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! Maybe we can name it RANG_TO
. We may add RANGE_UNTIL
later. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for RANG_TO
.
@@ -1126,6 +1137,12 @@ trait ImplicitExpressionConversions { | |||
implicit def symbol2FieldExpression(sym: Symbol): Expression = | |||
unresolvedFieldRef(sym.name) | |||
|
|||
implicit def scalaRange2Expression(range: Range.Inclusive): Expression = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scalaRange2Expression
-> scalaRange2RangeExpression
?
* Expands column expressions to it's real parent's input references. | ||
*/ | ||
@Internal | ||
public class ColumnsOperationExpander extends ApiExpressionDefaultVisitor<List<Expression>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about move ColumnsOperationExpander
into ApiExpressionUtils
, similar as FieldReferenceExtractor
. I think all of the Expression processes should be placed there. But I feel that this division is not very obvious. Do you have some suggestions for this? @dawidwys
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be another ResolverRule
once #8062 is merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunjincheng121 @dawidwys Thanks a lot for your advice. Maybe we can keep ColumnsOperationExpander
here for now and replace it to a ResolverRule once #8062 is merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense for my side. That depends on which PR be merged first. another should rebase code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this whole class should be internal to the rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to put it internal in the Rule. However, this class is same with LookupCallResolver
which is used not only in Rule but also used in TableImpl to make sure extractFieldReferences
get the right results.
@sunjincheng121 Thanks for your valuable suggestions. It makes the PR much better. I have addressed all your comments and updated the PR. Would be great if you can take another look. Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your update. @hequn8128!
I left the last comments from my side.
According to the current master code, the changes look good to me after you fix the last comments.
Since the suggestion of @dawidwys is that it is better to merge this PR after #8062. So, I think we can rebase the code after it. Then have the last review for the new changes. And appreciate if @dawidwys can have a look at the changes after rebase.
What do you think?
Best, Jincheng
private[flink] def resolveCalls(fields: Seq[Expression]): Seq[Expression] = { | ||
val outputFieldReferences = operationTree.asInstanceOf[LogicalNode] | ||
.output.map(a => new UnresolvedFieldReferenceExpression(a.name)) | ||
val expander = new ColumnsOperationExpander(outputFieldReferences) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outputFieldReferences
-> getOutputFieldReferences
Then remove :
val outputFieldReferences = operationTree.asInstanceOf[LogicalNode] .output.map(a => new UnresolvedFieldReferenceExpression(a.name))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the refactor of #8062, the getOutputFieldReferences()
method will not be used by other places, so I merged getOutputFieldReferences()
and resolveCalls()
into one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to expands columns here? It seems unnecessary for me. The only reason for the resolveCalls
in tableImpl
is because we need to split expressions into projections
, aggregates
and windowProperties
. We don't need to expand columns for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to expand columns before extractFieldReferences
, so that to get the correct field references.
private def expandColumnsInGroupWindow(window: GroupWindow): GroupWindow = { | ||
|
||
val resolvedTimeField = tableImpl.resolveCalls(Seq(window.getTimeField)) | ||
assert(resolvedTimeField.size() == 1, "TimeField of window only support one column.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TimeField of window only support one column.
-> Group Window only supports a single time field column.
?
@@ -630,18 +672,25 @@ class OverWindowedTableImpl( | |||
overWindows) | |||
} | |||
|
|||
private def expandColumnsInOverWindow(overWindows: Seq[OverWindow]): Seq[OverWindow] = { | |||
overWindows.map { e => | |||
val expanedPartitioning = tableImpl.resolveCalls(e.getPartitioning) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expaned
-> expanded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this method due to #8062
private def expandColumnsInOverWindow(overWindows: Seq[OverWindow]): Seq[OverWindow] = { | ||
overWindows.map { e => | ||
val expanedPartitioning = tableImpl.resolveCalls(e.getPartitioning) | ||
new OverWindow(e.getAlias, expanedPartitioning, e.getOrder, e.getPreceding, e.getFollowing) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.getOrder
-> tableImpl.resolveCalls(e.getOrder)
int start = (int) ((ValueLiteralExpression) range.getChildren().get(0)).getValue(); | ||
int end = (int) ((ValueLiteralExpression) range.getChildren().get(1)).getValue(); | ||
Preconditions.checkArgument( | ||
start < end, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start < end
-> start <= end
,
@sunjincheng121 Sounds good. I will rebase the PR once FLINK-11884 is merged. Best, Hequn |
Thank you for understanding! Will definitely look once it will be rebased on the merged changes. |
Hi @hequn8128, Also thank you again for waiting for that change! |
@dawidwys Cool. Great job for the |
db68aff
to
e654702
Compare
@sunjincheng121 @dawidwys I have updated the PR. Would be great if you can take another look. :-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick update! @hequn8128
LGTM. After fix the comments I left, Will to merged after @dawidwys say okay!
Best,
Jincheng
docs/dev/table/functions.md
Outdated
@@ -5800,3 +5800,284 @@ For Table API, please use `_` for spaces (e.g., `DAY_TO_HOUR`). | |||
| | `SQL_TSI_SECOND ` _(SQL-only)_ | | |||
|
|||
{% top %} | |||
|
|||
Column Operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column Functions ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column Expressions? Those are not really functions. They are more similar to fieldReferences rather than functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also hesitating, because all of our functions can also be called expression, such as =
, <
, >
These are Expressions, but our documentation is still called Comparison Functions, so I feel Expression is macro, and the functions are relatively specific. furthermore, we have defined it in BuiltInFunctionDefinitions
What's your opinion? @hequn8128
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both Functions and Expressions make sense here. The columns() method is an Expression and also a Function. However, if we decided to put this method in Built-In Functions
, naming it column Functions may be more consistent with the title?
Another reason is when we look at the syntax of the column method:
columnFunction:
[-]columns(columnExprs)
columnExprs:
columnExpr [, columnExpr]*
columnExpr:
columnRef | columnIndex to columnIndex | columnName to columnName
columnRef:
columnName(The field name that exists in the table) | columnIndex(a positive integer starting from 1)
we will find the column expression has already been used to indicate the parameters of column method, i.e., columnExpr.
@dawidwys What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we convert this method to another Visitor?
@dawidwys Considering your this comment. I think we can rename the ColumnsOperationExpander
to ColumnsFunctionExpander
and add a ColumnsExpressionExpander
to expand the expressions in the column function?
docs/dev/table/functions.md
Outdated
</div> | ||
</div> | ||
|
||
The column operations can be used in all places where column fields are expected, such as `select, groupBy, orderBy, UDX etc.` e.g.: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UDX -> UDFs ?
docs/dev/table/functions.md
Outdated
</div> | ||
</div> | ||
|
||
The column operations can be used in all places where column fields are expected, such as `select, groupBy, orderBy, UDX etc.` e.g.: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
column operations
-> column functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @hequn8128, thank you for the work here. I had a first look at where the expression resolution happens and posted a few comments. Once this is resolved I will have another look.
docs/dev/table/functions.md
Outdated
@@ -5800,3 +5800,284 @@ For Table API, please use `_` for spaces (e.g., `DAY_TO_HOUR`). | |||
| | `SQL_TSI_SECOND ` _(SQL-only)_ | | |||
|
|||
{% top %} | |||
|
|||
Column Operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column Expressions? Those are not really functions. They are more similar to fieldReferences rather than functions.
/** | ||
* Expand the columns expression in the input Expression. | ||
*/ | ||
private List<UnresolvedReferenceExpression> resolveSingleArg(Expression arg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we convert this method to another Visitor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Let me try this.
private[flink] def resolveCalls(fields: Seq[Expression]): Seq[Expression] = { | ||
val outputFieldReferences = operationTree.asInstanceOf[LogicalNode] | ||
.output.map(a => new UnresolvedFieldReferenceExpression(a.name)) | ||
val expander = new ColumnsOperationExpander(outputFieldReferences) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to expands columns here? It seems unnecessary for me. The only reason for the resolveCalls
in tableImpl
is because we need to split expressions into projections
, aggregates
and windowProperties
. We don't need to expand columns for that.
@@ -564,26 +575,47 @@ class WindowGroupedTableImpl( | |||
fields) | |||
} | |||
|
|||
private def expandColumnsInGroupWindow(window: GroupWindow): GroupWindow = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The time filed in GroupWindow can also contains a column operation. For example,
.window(Slide.over("3.milli").every("10.milli").on("columns(b)").as("w"))
And time field are passed to the extractFieldReferences()
:
val projectFields = extractFieldReferences(
(expressionsWithResolvedCalls ++ resolvedGroupKeys :+ resolvedWindow.getTimeField))
if we don't expand column operations for time field, extractFieldReferences
would be failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should rather modify the extractFieldReference
to extract also column expressions (They are actually field references). That way we will have a more unified approach and we can use the rule to expand them only in the ExpressionResolver
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would bring some duplicate code. However, it would also make sense. I will try this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the code in extractFieldReference
would be super simple. I haven't checked it, but this should work:
@Override
public List<Expression> visitCall(CallExpression call) {
FunctionDefinition functionDefinition = call.getFunctionDefinition();
if (WINDOW_PROPERTIES.contains(functionDefinition)) {
return Collections.emptyList();
} else if (COLUMN_OPERATIONS.contains(functionDefinition)) {
return Collections.singletonList(call);
} else {
return call.getChildren()
.stream()
.flatMap(c -> c.accept(this).stream())
.distinct()
.collect(Collectors.toList());
}
}
We do the same for UnresolvedFieldReference('*')
, which in fact can be expressed with columnOperation as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't return the column expression directly. We need to expand it as all references need to be distincted.
public static List<Expression> extractFieldReferences(List<Expression> expressions) {
return expressions.stream()
.flatMap(expr -> expr.accept(referenceExtractor).stream())
.distinct()
.collect(Collectors.toList());
}
@@ -94,6 +94,12 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp | |||
|
|||
case AS => | |||
assert(args.size >= 2) | |||
args.drop(1).foreach { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we validate it here? The whole PlannerExpressionConverter
is already past validationPhase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I validate it to check the invalid expression such as columns(1 to 2) as 'a
. I think you make a good point here, it's not good to valid in PlannerExpressionConverter
. Maybe we need to validate it in Column Expander Rule?
* Expands column expressions to it's real parent's input references. | ||
*/ | ||
@Internal | ||
public class ColumnsOperationExpander extends ApiExpressionDefaultVisitor<List<Expression>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this whole class should be internal to the rule.
Thanks for the contribution @hequn8128. I haven't had a look at the code but at the API again. I still find the
What do you think? |
At the beginning of the design, we refer to the operation of the R language column, using the |
@twalthr @sunjincheng121 OK. Let's do it with Best, Hequn |
…or the exand columns Rule
@sunjincheng121 @twalthr @dawidwys Thanks a lot for your great suggestions. I have updated the PR and rebased to the master. Would be great if you can take another look. Thank you very much! Best, Hequn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update! @hequn8128
I left a few minor comments.
Best,
Jincheng
docs/dev/table/functions.md
Outdated
| SYNTAX | DESC | | ||
| :--------------------- | :-------------------------- | | ||
| columns(...) | select the specified columns | | ||
| -columns(...) | deselect the columns specified | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
columns
-> withColumns
and -columns
->withoutColumns
docs/dev/table/functions.md
Outdated
|
||
{% highlight text %} | ||
columnFunction: | ||
[-]columns(columnExprs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[-]columns
->withColumns(..) | withoutColumns(..)
?
docs/dev/table/functions.md
Outdated
<tbody> | ||
<tr> | ||
<td> | ||
columns(*)|* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should correct all the name for columns
and -columns
for this doc.
/** | ||
* Tries to resolve all unresolved expressions such as {@link UnresolvedReferenceExpression} | ||
* or calls such as {@link BuiltInFunctionDefinitions#OVER}. | ||
* | ||
* <p>The default set of rules ({@link ExpressionResolver#getResolverRules()}) will resolve following references: | ||
* <ul> | ||
* <li>flatten '*' to all fields of underlying inputs</li> | ||
* <li>flatten '*' and columns operations to all fields of underlying inputs</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
columns operations
-> columns function
?
/** | ||
* Resolves columns functions to corresponding fields of inputs. See {@link ExpandColumnsFunctionRule} for details. | ||
*/ | ||
public static final ResolverRule EXPAND_COLUMN_OPERATIONS = new ExpandColumnsFunctionRule(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EXPAND_COLUMN_OPERATIONS
->EXPAND_COLUMNS_FUNCTION
@@ -184,6 +184,12 @@ trait ImplicitExpressionOperations { | |||
*/ | |||
def % (other: Expression): Expression = mod(other) | |||
|
|||
/** | |||
* Indicates the range from left to right, i.e. [left, right], which can be used in columns | |||
* selection, e.g.: columns(1 to 3). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
columns(1 to 3)
-> withColumns(1 to 3)
@sunjincheng121 Thank you for the review. I have renamed these "column operation" to "column function". So in Table API, column operations means Table.addColumns()/Table.dropColumns, and column functions means withColumns/withoutColumns. I think it's more clear now. Best, Hequn |
+1 to merged.. |
@flinkbot approve all |
What is the purpose of the change
This pull request adds column operations for TableApi. Column oprations can be used to select or deselect columns. More details can be found in the google design doc.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
testColumnOperations
inCalcITCase
.ColumnsOperationTest
to test column operations in different table operations.ColumnsOperationValidationTest
to test exceptions.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes)Documentation