Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package org.apache.flink.api.table
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataTypeField
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rex.RexNode
import org.apache.calcite.rel.logical.LogicalProject
import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexCall, RexNode}
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.tools.RelBuilder
import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
import org.apache.flink.api.table.plan.RexNodeTranslator
import org.apache.calcite.util.NlsString
import org.apache.flink.api.table.plan.{PlanGenException, RexNodeTranslator}
import RexNodeTranslator.{toRexNode, extractAggCalls}
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.parser.ExpressionParser
Expand Down Expand Up @@ -98,7 +101,29 @@ class Table(
.map(toRexNode(_, relBuilder))

relBuilder.project(exprs.toIterable.asJava)
new Table(relBuilder.build(), relBuilder)
var projected = relBuilder.build()

if(relNode == projected) {
// Calcite's RelBuilder does not translate identity projects even if they rename fields.
// Add a projection ourselves (will be automatically removed by translation rules).
val names = exprs.map{ e =>
e.getKind match {
case SqlKind.AS =>
e.asInstanceOf[RexCall].getOperands.get(1)
.asInstanceOf[RexLiteral].getValue
.asInstanceOf[NlsString].getValue
case SqlKind.INPUT_REF =>
relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex)
case _ =>
throw new PlanGenException("Unexpected expression type encountered.")
}

}

projected = LogicalProject.create(relNode, exprs.toList.asJava, names.toList.asJava)
}

new Table(projected, relBuilder)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,27 @@ public void testSimpleSelectWithNaming() throws Exception {
compareResultAsText(results, expected);
}

@Test
public void testSimpleSelectRenameAll() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = getJavaTableEnvironment();

DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);

Table in = tableEnv.fromDataSet(ds);

Table result = in
.select("f0 as a, f1 as b, f2 as c")
.select("a, b");

DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
compareResultAsText(results, expected);
}

@Test(expected = IllegalArgumentException.class)
public void testAsWithToFewFields() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ class SelectITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test
def testSimpleSelectRenameAll(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val t = CollectionDataSets.get3TupleDataSet(env).toTable
.select('_1 as 'a, '_2 as 'b, '_3 as 'c)
.select('a, 'b)

val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
val results = t.toDataSet[Row](getConfig).collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test(expected = classOf[IllegalArgumentException])
def testAsWithToFewFields(): Unit = {

Expand Down