From c707f5d84ec0074a9c768a29415870f9ee1b5de9 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Mon, 6 Jun 2016 16:30:41 +0800 Subject: [PATCH 1/4] Allow different field names for union in Table API. --- .../api/java/operators/UnionOperator.java | 6 ---- .../api/table/plan/logical/operators.scala | 6 ++-- .../api/scala/batch/table/UnionITCase.scala | 34 +++++++++++++++++-- 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java index 70ad374cd4d56..94157e753ae06 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java @@ -19,7 +19,6 @@ package org.apache.flink.api.java.operators; import org.apache.flink.annotation.Public; -import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Union; import org.apache.flink.api.java.DataSet; @@ -43,11 +42,6 @@ public class UnionOperator extends TwoInputOperator public UnionOperator(DataSet input1, DataSet input2, String unionLocationName) { super(input1, input2, input1.getType()); - if (!input1.getType().equals(input2.getType())) { - throw new InvalidProgramException("Cannot union inputs of different types. Input1=" - + input1.getType() + ", input2=" + input2.getType()); - } - this.unionLocationName = unionLocationName; } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala index 9f57ac9f62f22..d8735e23ffc3e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -252,11 +252,11 @@ case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends Bi s" ${left.output.size} and ${right.output.size}") } val sameSchema = left.output.zip(right.output).forall { case (l, r) => - l.resultType == r.resultType && l.name == r.name } + l.resultType == r.resultType } if (!sameSchema) { failValidation(s"Union two table of different schema:" + - s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" + - s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]") + s" [${left.output.map(a => a.resultType).mkString(", ")}] and" + + s" [${right.output.map(a => a.resultType).mkString(", ")}]") } resolvedUnion } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala index f472341284bc9..e24c9c9f54b37 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala @@ -68,6 +68,36 @@ class UnionITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testUnionAllDifferentFieldNames(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f) + + val unionDs = ds1.unionAll(ds2).select('c) + + val results = unionDs.toDataSet[Row].collect() + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testUnionDifferentFieldNames(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f) + + val unionDs = ds1.union(ds2).select('c) + + val results = unionDs.toDataSet[Row].collect() + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + @Test def testTernaryUnionAll(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment @@ -103,14 +133,14 @@ class UnionITCase( } @Test(expected = classOf[ValidationException]) - def testUnionDifferentFieldNames(): Unit = { + def testUnionDifferentColumnSize(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e) - // must fail. Union inputs have different field names. + // must fail. Union inputs have different column size. ds1.unionAll(ds2) } From 8a1b9efd2be92b337154418a0faca7f87ea6c493 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Fri, 1 Jul 2016 15:24:02 +0800 Subject: [PATCH 2/4] Remove fieldNames parameter from RowTypeInfo. --- .../api/java/operators/UnionOperator.java | 6 ++++ .../plan/nodes/dataset/DataSetAggregate.scala | 2 +- .../table/plan/schema/TableSourceTable.scala | 2 +- .../api/table/typeutils/RowTypeInfo.scala | 23 +++---------- .../api/table/typeutils/TypeConverter.scala | 6 ++-- .../api/scala/batch/sql/UnionITCase.scala | 8 ++--- .../api/scala/batch/table/UnionITCase.scala | 32 +------------------ .../api/scala/stream/table/UnionITCase.scala | 2 +- .../scala/typeutils/CaseClassTypeInfo.scala | 2 +- .../connectors/kafka/KafkaTableSource.java | 2 +- .../JsonRowDeserializationSchema.java | 2 +- 11 files changed, 25 insertions(+), 62 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java index 94157e753ae06..70ad374cd4d56 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Union; import org.apache.flink.api.java.DataSet; @@ -42,6 +43,11 @@ public class UnionOperator extends TwoInputOperator public UnionOperator(DataSet input1, DataSet input2, String unionLocationName) { super(input1, input2, input1.getType()); + if (!input1.getType().equals(input2.getType())) { + throw new InvalidProgramException("Cannot union inputs of different types. Input1=" + + input1.getType() + ", input2=" + input2.getType()); + } + this.unionLocationName = unionLocationName; } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala index 910f05c9c5549..f411bdbf634b2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -111,7 +111,7 @@ class DataSetAggregate( .name(prepareOpName) val groupReduceFunction = aggregateResult._2 - val rowTypeInfo = new RowTypeInfo(fieldTypes, rowType.getFieldNames.asScala) + val rowTypeInfo = new RowTypeInfo(fieldTypes) val result = { if (groupingKeys.length > 0) { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala index 03646f90aff37..a11e8c1c365e5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala @@ -25,6 +25,6 @@ import org.apache.flink.api.table.typeutils.RowTypeInfo /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable(val tableSource: TableSource[_]) extends FlinkTable[Row]( - typeInfo = new RowTypeInfo(tableSource.getFieldTypes, tableSource.getFieldsNames), + typeInfo = new RowTypeInfo(tableSource.getFieldTypes), fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray, fieldNames = tableSource.getFieldsNames) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala index d606a7624d76f..6b0a27c1a3315 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala @@ -25,38 +25,25 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import scala.collection.mutable.ArrayBuffer import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.table.{Row, TableException} +import org.apache.flink.api.table.Row /** * TypeInformation for [[Row]]. */ -class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]], fieldNames: Seq[String]) +class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]]) extends CaseClassTypeInfo[Row]( classOf[Row], Array(), fieldTypes, - fieldNames) + Nil) { - if (fieldTypes.length != fieldNames.length) { - throw new TableException("Number of field types and names is different.") - } - if (fieldNames.length != fieldNames.toSet.size) { - throw new TableException("Field names are not unique.") - } - - def this(fieldTypes: Seq[TypeInformation[_]]) = { - this(fieldTypes, for (i <- fieldTypes.indices) yield "f" + i) - } - - def this(fieldTypes: Array[TypeInformation[_]], fieldNames: Array[String]) = { - this(fieldTypes.toSeq, fieldNames.toSeq) - } - def this(fieldTypes: Array[TypeInformation[_]]) = { this(fieldTypes.toSeq) } + this.fieldNames = for (i <- fieldTypes.indices) yield "f" + i + /** * Temporary variable for directly passing orders to comparators. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala index 95e50d510018d..cd41574e5afd3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala @@ -36,7 +36,7 @@ import scala.collection.JavaConversions._ object TypeConverter { - val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq(), Seq()).asInstanceOf[TypeInformation[Any]] + val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]] def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match { case BOOLEAN_TYPE_INFO => BOOLEAN @@ -172,7 +172,7 @@ object TypeConverter { // Row is expected, create the arity for it case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] => - new RowTypeInfo(logicalFieldTypes, logicalFieldNames) + new RowTypeInfo(logicalFieldTypes) // no physical type // determine type based on logical fields and configuration parameters @@ -180,7 +180,7 @@ object TypeConverter { // no need for efficient types -> use Row // we cannot use efficient types if row arity > tuple arity or nullable if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) { - new RowTypeInfo(logicalFieldTypes, logicalFieldNames) + new RowTypeInfo(logicalFieldTypes) } // use efficient type tuple or atomic type else { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala index 527eac75c592d..fbc1ba2dd880a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala @@ -44,12 +44,12 @@ class UnionITCase( val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT c FROM t2)" + val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT f FROM t2)" val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) val ds2 = CollectionDataSets.getSmall3TupleDataSet(env) tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) - tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f) val result = tEnv.sql(sqlQuery) @@ -64,12 +64,12 @@ class UnionITCase( val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val sqlQuery = "SELECT c FROM t1 UNION (SELECT c FROM t2)" + val sqlQuery = "SELECT c FROM t1 UNION (SELECT f FROM t2)" val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) val ds2 = CollectionDataSets.getSmall3TupleDataSet(env) tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) - tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f) val result = tEnv.sql(sqlQuery) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala index e24c9c9f54b37..f1924dc298392 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala @@ -44,7 +44,7 @@ class UnionITCase( val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f) val unionDs = ds1.unionAll(ds2).select('c) @@ -58,36 +58,6 @@ class UnionITCase( val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - val unionDs = ds1.union(ds2).select('c) - - val results = unionDs.toDataSet[Row].collect() - val expected = "Hi\n" + "Hello\n" + "Hello world\n" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testUnionAllDifferentFieldNames(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f) - - val unionDs = ds1.unionAll(ds2).select('c) - - val results = unionDs.toDataSet[Row].collect() - val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testUnionDifferentFieldNames(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala index feda75fb15fd4..131974eca0221 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala @@ -38,7 +38,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f) val unionDs = ds1.unionAll(ds2).select('c) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala index d658fdec3ce0d..2c47a07b850be 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala @@ -43,7 +43,7 @@ abstract class CaseClassTypeInfo[T <: Product]( clazz: Class[T], val typeParamTypeInfos: Array[TypeInformation[_]], fieldTypes: Seq[TypeInformation[_]], - val fieldNames: Seq[String]) + var fieldNames: Seq[String]) extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) { @PublicEvolving diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index e43760b27c81b..c6904fe8f6de0 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -123,7 +123,7 @@ public TypeInformation[] getFieldTypes() { @Override public TypeInformation getReturnType() { - return new RowTypeInfo(fieldTypes, fieldNames); + return new RowTypeInfo(fieldTypes); } /** diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java index 1bbef4d847c55..970c73ebf867e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java @@ -118,7 +118,7 @@ public boolean isEndOfStream(Row nextElement) { @Override public TypeInformation getProducedType() { - return new RowTypeInfo(fieldTypes, fieldNames); + return new RowTypeInfo(fieldTypes); } /** From 02c9130e936498993f3f60d55408c48f793c36b2 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Mon, 4 Jul 2016 14:18:08 +0800 Subject: [PATCH 3/4] Modify CodeGenerator to avoid coincident comparing. --- .../org/apache/flink/api/table/codegen/CodeGenerator.scala | 2 +- .../org/apache/flink/api/table/typeutils/RowTypeInfo.scala | 4 +--- .../apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index 486ba53ea4d1e..26c3928d26609 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -523,7 +523,7 @@ class CodeGenerator( (input2.getOrElse(throw new CodeGenException("Invalid input access.")), input2Term) } - val index = if (input._1 == input1) { + val index = if (input._2 == input1Term) { inputRef.getIndex } else { inputRef.getIndex - input1.getArity diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala index 6b0a27c1a3315..489edca841af1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala @@ -35,15 +35,13 @@ class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]]) classOf[Row], Array(), fieldTypes, - Nil) + for (i <- fieldTypes.indices) yield "f" + i) { def this(fieldTypes: Array[TypeInformation[_]]) = { this(fieldTypes.toSeq) } - this.fieldNames = for (i <- fieldTypes.indices) yield "f" + i - /** * Temporary variable for directly passing orders to comparators. */ diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala index 2c47a07b850be..d658fdec3ce0d 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala @@ -43,7 +43,7 @@ abstract class CaseClassTypeInfo[T <: Product]( clazz: Class[T], val typeParamTypeInfos: Array[TypeInformation[_]], fieldTypes: Seq[TypeInformation[_]], - var fieldNames: Seq[String]) + val fieldNames: Seq[String]) extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) { @PublicEvolving From 3d91a91648db98ba83649bb213510da2a873371b Mon Sep 17 00:00:00 2001 From: gallenvara Date: Sun, 10 Jul 2016 23:21:41 +0800 Subject: [PATCH 4/4] resolve conflict. --- .../org/apache/flink/api/table/plan/logical/operators.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala index 028983b4a7004..8b45f80736a22 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -252,11 +252,11 @@ case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends Bi s" ${left.output.size} and ${right.output.size}") } val sameSchema = left.output.zip(right.output).forall { case (l, r) => - l.resultType == r.resultType && l.name == r.name } + l.resultType == r.resultType } if (!sameSchema) { failValidation(s"Union two tables of different schema:" + - s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" + - s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]") + s" [${left.output.map(a => a.resultType).mkString(", ")}] and" + + s" [${right.output.map(a => a.resultType).mkString(", ")}]") } resolvedUnion }