Skip to content

Commit

Permalink
[FLINK-12017][table-planner-blink] Support translation from Rank/Dedu…
Browse files Browse the repository at this point in the history
…plicate to StreamTransformation.
  • Loading branch information
beyond1920 committed Apr 3, 2019
1 parent 096c63a commit 3c720b9
Show file tree
Hide file tree
Showing 62 changed files with 6,281 additions and 126 deletions.
7 changes: 7 additions & 0 deletions flink-table/flink-table-planner-blink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.plan.util;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.codegen.CodeGeneratorContext;
import org.apache.flink.table.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.generated.GeneratedProjection;
import org.apache.flink.table.runtime.keySelector.BaseRowKeySelector;
import org.apache.flink.table.runtime.keySelector.BinaryRowKeySelector;
import org.apache.flink.table.runtime.keySelector.NullBinaryRowKeySelector;
import org.apache.flink.table.type.InternalType;
import org.apache.flink.table.type.RowType;
import org.apache.flink.table.typeutils.BaseRowSerializer;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.typeutils.TypeCheckUtils;

/**
* Utility for KeySelector.
*/
public class KeySelectorUtil {

/**
* Create a BaseRowKeySelector to extract keys from DataStream which type is BaseRowTypeInfo.
*
* @param keyFields key fields
* @param rowType type of DataStream to extract keys
*
* @return the BaseRowKeySelector to extract keys from DataStream which type is BaseRowTypeInfo.
*/
public static BaseRowKeySelector getBaseRowSelector(int[] keyFields, BaseRowTypeInfo rowType) {
if (keyFields.length > 0) {
InternalType[] inputFieldTypes = rowType.getInternalTypes();
String[] inputFieldNames = rowType.getFieldNames();
InternalType[] keyFieldTypes = new InternalType[keyFields.length];
String[] keyFieldNames = new String[keyFields.length];
for (int i = 0; i < keyFields.length; ++i) {
keyFieldTypes[i] = inputFieldTypes[keyFields[i]];
keyFieldNames[i] = inputFieldNames[keyFields[i]];
}
RowType returnType = new RowType(keyFieldTypes, keyFieldNames);
RowType inputType = new RowType(inputFieldTypes, rowType.getFieldNames());
GeneratedProjection generatedProjection = ProjectionCodeGenerator.generateProjection(
CodeGeneratorContext.apply(new TableConfig()),
BaseRowSerializer.class.getSimpleName(), inputType, returnType, keyFields);
BaseRowTypeInfo keyRowType = returnType.toTypeInfo();
// check if type implements proper equals/hashCode
TypeCheckUtils.validateEqualsHashCode("grouping", keyRowType);
return new BinaryRowKeySelector(keyRowType, generatedProjection);
} else {
return new NullBinaryRowKeySelector();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,20 @@ class TableConfig {
!disableOperators.contains(operator.toString)
}
}

def getMinIdleStateRetentionTime: Long = {
this.conf.getLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MS)
}

def getMaxIdleStateRetentionTime: Long = {
// only min idle ttl provided.
if (this.conf.contains(TableConfigOptions.SQL_EXEC_STATE_TTL_MS)
&& !this.conf.contains(TableConfigOptions.SQL_EXEC_STATE_TTL_MAX_MS)) {
this.conf.setLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MAX_MS,
getMinIdleStateRetentionTime * 2)
}
this.conf.getLong(TableConfigOptions.SQL_EXEC_STATE_TTL_MAX_MS)
}
}

object TableConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ object CodeGenUtils {

case InternalTypes.DATE => "int"
case InternalTypes.TIME => "int"
case InternalTypes.TIMESTAMP => "long"
case _: TimestampType => "long"

case InternalTypes.INTERVAL_MONTHS => "int"
case InternalTypes.INTERVAL_MILLIS => "long"
Expand All @@ -131,7 +131,7 @@ object CodeGenUtils {

case InternalTypes.DATE => boxedTypeTermForType(InternalTypes.INT)
case InternalTypes.TIME => boxedTypeTermForType(InternalTypes.INT)
case InternalTypes.TIMESTAMP => boxedTypeTermForType(InternalTypes.LONG)
case _: TimestampType => boxedTypeTermForType(InternalTypes.LONG)

case InternalTypes.STRING => BINARY_STRING
case InternalTypes.BINARY => "byte[]"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.codegen

import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.Indenter.toISC
import org.apache.flink.table.dataformat.{BaseRow, BinaryRow}
import org.apache.flink.table.generated.{GeneratedRecordEqualiser, RecordEqualiser}
import org.apache.flink.table.`type`.{DateType, InternalType, PrimitiveType, RowType, TimeType, TimestampType}

class EqualiserCodeGenerator(fieldTypes: Seq[InternalType]) {

private val BASE_ROW = className[BaseRow]
private val BINARY_ROW = className[BinaryRow]
private val RECORD_EQUALISER = className[RecordEqualiser]
private val LEFT_INPUT = "left"
private val RIGHT_INPUT = "right"

def generateRecordEqualiser(name: String): GeneratedRecordEqualiser = {
// ignore time zone
val ctx = CodeGeneratorContext(new TableConfig)
val className = newName(name)
val header =
s"""
|if ($LEFT_INPUT.getHeader() != $RIGHT_INPUT.getHeader()) {
| return false;
|}
""".stripMargin

val codes = for (i <- fieldTypes.indices) yield {
val fieldType = fieldTypes(i)
val fieldTypeTerm = primitiveTypeTermForType(fieldType)
val result = s"cmp$i"
val leftNullTerm = "leftIsNull$" + i
val rightNullTerm = "rightIsNull$" + i
val leftFieldTerm = "leftField$" + i
val rightFieldTerm = "rightField$" + i
val equalsCode = if (isInternalPrimitive(fieldType)) {
s"$leftFieldTerm == $rightFieldTerm"
} else if (isBaseRow(fieldType)) {
val equaliserGenerator =
new EqualiserCodeGenerator(fieldType.asInstanceOf[RowType].getFieldTypes)
val generatedEqualiser = equaliserGenerator
.generateRecordEqualiser("field$" + i + "GeneratedEqualiser")
val generatedEqualiserTerm = ctx.addReusableObject(
generatedEqualiser, "field$" + i + "GeneratedEqualiser")
val equaliserTypeTerm = classOf[RecordEqualiser].getCanonicalName
val equaliserTerm = newName("equaliser")
ctx.addReusableMember(s"private $equaliserTypeTerm $equaliserTerm = null;")
ctx.addReusableInitStatement(
s"""
|$equaliserTerm = ($equaliserTypeTerm)
| $generatedEqualiserTerm.newInstance(Thread.currentThread().getContextClassLoader());
|""".stripMargin)
s"$equaliserTerm.equalsWithoutHeader($leftFieldTerm, $rightFieldTerm)"
} else {
s"$leftFieldTerm.equals($rightFieldTerm)"
}
val leftReadCode = baseRowFieldReadAccess(ctx, i, LEFT_INPUT, fieldType)
val rightReadCode = baseRowFieldReadAccess(ctx, i, RIGHT_INPUT, fieldType)
s"""
|boolean $leftNullTerm = $LEFT_INPUT.isNullAt($i);
|boolean $rightNullTerm = $RIGHT_INPUT.isNullAt($i);
|boolean $result;
|if ($leftNullTerm && $rightNullTerm) {
| $result = true;
|} else if ($leftNullTerm || $rightNullTerm) {
| $result = false;
|} else {
| $fieldTypeTerm $leftFieldTerm = $leftReadCode;
| $fieldTypeTerm $rightFieldTerm = $rightReadCode;
| $result = $equalsCode;
|}
|if (!$result) {
| return false;
|}
""".stripMargin
}

val functionCode =
j"""
public final class $className implements $RECORD_EQUALISER {

${ctx.reuseMemberCode()}

public $className(Object[] references) throws Exception {
${ctx.reuseInitCode()}
}

@Override
public boolean equals($BASE_ROW $LEFT_INPUT, $BASE_ROW $RIGHT_INPUT) {
if ($LEFT_INPUT instanceof $BINARY_ROW && $RIGHT_INPUT instanceof $BINARY_ROW) {
return $LEFT_INPUT.equals($RIGHT_INPUT);
} else {
$header
${ctx.reuseLocalVariableCode()}
${codes.mkString("\n")}
return true;
}
}

@Override
public boolean equalsWithoutHeader($BASE_ROW $LEFT_INPUT, $BASE_ROW $RIGHT_INPUT) {
if ($LEFT_INPUT instanceof $BINARY_ROW && $RIGHT_INPUT instanceof $BINARY_ROW) {
return (($BINARY_ROW)$LEFT_INPUT).equalsWithoutHeader((($BINARY_ROW)$RIGHT_INPUT));
} else {
${ctx.reuseLocalVariableCode()}
${codes.mkString("\n")}
return true;
}
}
}
""".stripMargin

new GeneratedRecordEqualiser(className, functionCode, ctx.references.toArray)
}

private def isInternalPrimitive(t: InternalType): Boolean = t match {
case _: PrimitiveType => true

case _: DateType => true
case TimeType.INSTANCE => true
case _: TimestampType => true

case _ => false
}

private def isBaseRow(t: InternalType): Boolean = t match {
case _: RowType => true
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeIn
import org.apache.flink.table.api.{Table, TableConfig, TableException, Types}
import org.apache.flink.table.codegen.CodeGenUtils.{BASE_ROW, genToExternal}
import org.apache.flink.table.codegen.OperatorCodeGenerator.generateCollect
import org.apache.flink.table.dataformat.util.BaseRowUtil
import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
import org.apache.flink.table.runtime.OneInputOperatorWrapper
import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink}
Expand Down Expand Up @@ -93,6 +94,10 @@ object SinkCodeGenerator {
new RowTypeInfo(
inputTypeInfo.getFieldTypes,
inputTypeInfo.getFieldNames)
case gt: GenericTypeInfo[BaseRow] if gt.getTypeClass == classOf[BaseRow] =>
new BaseRowTypeInfo(
inputTypeInfo.getInternalTypes,
inputTypeInfo.getFieldNames)
case _ => requestedTypeInfo
}

Expand Down Expand Up @@ -154,13 +159,14 @@ object SinkCodeGenerator {
val retractProcessCode = if (!withChangeFlag) {
generateCollect(genToExternal(ctx, outputTypeInfo, afterIndexModify))
} else {
val flagResultTerm = s"$afterIndexModify.getHeader() == $BASE_ROW.ACCUMULATE_MSG"
val flagResultTerm =
s"${classOf[BaseRowUtil].getCanonicalName}.isAccumulateMsg($afterIndexModify)"
val resultTerm = CodeGenUtils.newName("result")
val genericRowField = classOf[GenericRow].getCanonicalName
s"""
|$genericRowField $resultTerm = new $genericRowField(2);
|$resultTerm.update(0, $flagResultTerm);
|$resultTerm.update(1, $afterIndexModify);
|$resultTerm.setField(0, $flagResultTerm);
|$resultTerm.setField(1, $afterIndexModify);
|${generateCollect(genToExternal(ctx, outputTypeInfo, resultTerm))}
""".stripMargin
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ class SortCodeGenerator(
case InternalTypes.FLOAT => 4
case InternalTypes.DOUBLE => 8
case InternalTypes.LONG => 8
case _: TimestampType => 8
case dt: DecimalType if Decimal.isCompact(dt.precision()) => 8
case InternalTypes.STRING | InternalTypes.BINARY => Int.MaxValue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
import org.apache.flink.table.runtime.values.ValuesInputFormat
import org.apache.flink.table.typeutils.BaseRowTypeInfo

import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.RexLiteral
Expand Down Expand Up @@ -56,8 +55,7 @@ object ValuesCodeGenerator {
generatedRecords.map(_.code),
outputType)

val baseRowTypeInfo = new BaseRowTypeInfo(outputType.getFieldTypes, outputType.getFieldNames)
new ValuesInputFormat(generatedFunction, baseRowTypeInfo)
new ValuesInputFormat(generatedFunction, outputType.toTypeInfo)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.table.plan.nodes.calcite

import org.apache.flink.table.plan.nodes.calcite.RankType.RankType
import org.apache.flink.table.runtime.rank.{RankRange, RankType}

import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataTypeField
Expand Down

0 comments on commit 3c720b9

Please sign in to comment.