Skip to content

Commit

Permalink
[FLINK-3226] implement getUniqueName method in TranslationContext
Browse files Browse the repository at this point in the history
This closes #1600 and #1567
  • Loading branch information
vasia committed Feb 11, 2016
1 parent 509c4b9 commit fff25df
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object RexNodeTranslator {

exp match {
case agg: Aggregation =>
val name = "TMP_" + agg.hashCode().toHexString.toUpperCase
val name = TranslationContext.getUniqueName
val aggCall = toAggCall(agg, name, relBuilder)
val fieldExp = new UnresolvedFieldReference(name)
(fieldExp, List(aggCall))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ object TranslationContext {

}

def getUniqueName: String = {
"TMP_" + nameCntr.getAndIncrement()
}

def getRelBuilder: RelBuilder = {
relBuilder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class DataSetGroupReduce(
config: TableConfig,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config)
val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType)

// get the output types
val fieldsNames = rowType.getFieldNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public AggregationsITCase(TestExecutionMode mode){
super(mode);
}

@Ignore //DataSetMap needs to be implemented
@Test
public void testAggregationTypes() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {

@Ignore //DataSetMap needs to be implemented
@Test
def testAggregationTypes(): Unit = {

Expand Down Expand Up @@ -71,6 +70,20 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test
def testProjection(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val t = env.fromElements(
(1: Byte, 1: Short),
(2: Byte, 2: Short)).toTable
.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)

val expected = "1,3,2,1,3"
val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Ignore // it seems like the arithmetic expression is added to the field position
@Test(expected = classOf[NotImplementedError])
def testAggregationWithArithmetic(): Unit = {
Expand Down

0 comments on commit fff25df

Please sign in to comment.