Skip to content

Commit

Permalink
[FLINK-3612] remove PageRank Table example
Browse files Browse the repository at this point in the history
add a type conversion mapper after aggregations if the expected type is not a Row

This closes #1793
  • Loading branch information
vasia committed Mar 18, 2016
1 parent 63c6dad commit 074ea70
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 338 deletions.
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.flink.examples.java;


import org.apache.flink.api.table.Table;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand All @@ -30,14 +29,14 @@ public class JavaTableExample {

public static class WC {
public String word;
public int count;
public long count;

// Public constructor to make it a Flink POJO
public WC() {

}

public WC(String word, int count) {
public WC(String word, long count) {
this.word = word;
this.count = count;
}
Expand Down
Expand Up @@ -22,9 +22,11 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.runtime.MapRunner
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.api.table.typeutils.{TypeConverter, RowTypeInfo}
Expand Down Expand Up @@ -69,12 +71,6 @@ class DataSetAggregate(
config: TableConfig,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

expectedType match {
case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
throw new PlanGenException("Aggregate operations currently only support returning Rows.")
case _ => // ok
}

val groupingKeys = grouping.indices.toArray
// add grouping fields, position keys in the input, and input type
val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
Expand All @@ -96,24 +92,34 @@ class DataSetAggregate(
val mappedInput = inputDS.map(aggregateResult._1).name(s"prepare $aggString")
val groupReduceFunction = aggregateResult._2

if (groupingKeys.length > 0) {
val result = {
if (groupingKeys.length > 0) {
val inFields = inputType.getFieldNames.asScala.toList
val groupByString = s"groupBy: (${grouping.map(inFields(_)).mkString(", ")})"

val inFields = inputType.getFieldNames.asScala.toList
val groupByString = s"groupBy: (${grouping.map( inFields(_) ).mkString(", ")})"

mappedInput.asInstanceOf[DataSet[Row]]
.groupBy(groupingKeys: _*)
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
mappedInput.asInstanceOf[DataSet[Row]]
.groupBy(groupingKeys: _*)
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.name(groupByString + ", " + aggString)
.asInstanceOf[DataSet[Any]]
.asInstanceOf[DataSet[Any]]
}
else {
// global aggregation
mappedInput.asInstanceOf[DataSet[Row]]
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.asInstanceOf[DataSet[Any]]
}
}
else {
// global aggregation
mappedInput.asInstanceOf[DataSet[Row]]
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.asInstanceOf[DataSet[Any]]

// if the expected type is not a Row, inject a mapper to convert to the expected type
expectedType match {
case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
val mapName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
result.map(typeConversion(config, rowTypeInfo, expectedType.get))
.name(mapName)
case _ => result
}
}

Expand All @@ -137,4 +143,32 @@ class DataSetAggregate(
s"select: (${outFieldsString.mkString(", ")})"
}

private def typeConversion(
config: TableConfig,
rowTypeInfo: RowTypeInfo,
expectedType: TypeInformation[Any]): MapFunction[Any, Any] = {

val generator = new CodeGenerator(config, rowTypeInfo.asInstanceOf[TypeInformation[Any]])
val conversion = generator.generateConverterResultExpression(
expectedType, rowType.getFieldNames.asScala)

val body =
s"""
|${conversion.code}
|return ${conversion.resultTerm};
|""".stripMargin

val genFunction = generator.generateFunction(
"AggregateOutputConversion",
classOf[MapFunction[Any, Any]],
body,
expectedType)

new MapRunner[Any, Any](
genFunction.name,
genFunction.code,
genFunction.returnType)

}

}

This file was deleted.

0 comments on commit 074ea70

Please sign in to comment.