Skip to content

Commit

Permalink
[FLINK-3226] Translation from and to POJOs for CodeGenerator
Browse files Browse the repository at this point in the history
This closes apache#1624
  • Loading branch information
twalthr authored and vasia committed Mar 18, 2016
1 parent 48d9887 commit dd412ea
Show file tree
Hide file tree
Showing 11 changed files with 648 additions and 107 deletions.
18 changes: 15 additions & 3 deletions main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,27 @@ class TableEnvironment {

/**
* Converts the given [[org.apache.flink.api.table.Table]] to
* a DataSet. The given type must have exactly the same fields as the
* [[org.apache.flink.api.table.Table]]. That is, the names of the
* fields and the types must match.
* a DataSet. The given type must have exactly the same field types and field order as the
* [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
* POJO types require name equivalence to be mapped correctly as their fields do not have
* an order.
*/
@SuppressWarnings(Array("unchecked"))
def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
new JavaBatchTranslator(config).translate[T](table.relNode)(
TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
}

/**
* Converts the given [[org.apache.flink.api.table.Table]] to
* a DataSet. The given type must have exactly the same field types and field order as the
* [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
* POJO types require name equivalence to be mapped correctly as their fields do not have
* an order.
*/
def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo)
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ class TableEnvironment {

/**
* Converts the given [[org.apache.flink.api.table.Table]] to
* a DataSet. The given type must have exactly the same fields as the
* [[org.apache.flink.api.table.Table]]. That is, the names of the
* fields and the types must match.
* a DataSet. The given type must have exactly the same field types and field order as the
* [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
* POJO types require name equivalence to be mapped correctly as their fields do not have
* an order.
*/
def toDataSet[T: TypeInformation](table: Table): DataSet[T] = {
new ScalaBatchTranslator(config).translate[T](table.relNode)
Expand Down
59 changes: 54 additions & 5 deletions main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.flink.api.table.codegen

import java.lang.reflect.Field
import java.util.concurrent.atomic.AtomicInteger

import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.java.typeutils.{TypeExtractor, PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.table.typeinfo.RowTypeInfo

Expand Down Expand Up @@ -150,7 +151,11 @@ object CodeGenUtils {

sealed abstract class FieldAccessor

case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
case class ObjectFieldAccessor(field: Field) extends FieldAccessor

case class ObjectGenericFieldAccessor(name: String) extends FieldAccessor

case class ObjectPrivateFieldAccessor(field: Field) extends FieldAccessor

case class ObjectMethodAccessor(methodName: String) extends FieldAccessor

Expand All @@ -165,12 +170,56 @@ object CodeGenUtils {
ObjectMethodAccessor(cc.getFieldNames()(index))

case javaTup: TupleTypeInfo[_] =>
ObjectFieldAccessor("f" + index)
ObjectGenericFieldAccessor("f" + index)

case pj: PojoTypeInfo[_] =>
ObjectFieldAccessor(pj.getFieldNames()(index))
case pt: PojoTypeInfo[_] =>
val fieldName = pt.getFieldNames()(index)
getFieldAccessor(pt.getTypeClass, fieldName)

case _ => throw new CodeGenException("Unsupported composite type.")
}
}

def getFieldAccessor(clazz: Class[_], fieldName: String): FieldAccessor = {
val field = TypeExtractor.getDeclaredField(clazz, fieldName)
if (field.isAccessible) {
ObjectFieldAccessor(field)
}
else {
ObjectPrivateFieldAccessor(field)
}
}

def isFieldPrimitive(field: Field): Boolean = field.getType.isPrimitive

def reflectiveFieldReadAccess(fieldTerm: String, field: Field, objectTerm: String): String =
field.getType match {
case java.lang.Integer.TYPE => s"$fieldTerm.getInt($objectTerm)"
case java.lang.Long.TYPE => s"$fieldTerm.getLong($objectTerm)"
case java.lang.Short.TYPE => s"$fieldTerm.getShort($objectTerm)"
case java.lang.Byte.TYPE => s"$fieldTerm.getByte($objectTerm)"
case java.lang.Float.TYPE => s"$fieldTerm.getFloat($objectTerm)"
case java.lang.Double.TYPE => s"$fieldTerm.getDouble($objectTerm)"
case java.lang.Boolean.TYPE => s"$fieldTerm.getBoolean($objectTerm)"
case java.lang.Character.TYPE => s"$fieldTerm.getChar($objectTerm)"
case _ => s"(${field.getType.getCanonicalName}) $fieldTerm.get($objectTerm)"
}

def reflectiveFieldWriteAccess(
fieldTerm: String,
field: Field,
objectTerm: String,
valueTerm: String)
: String =
field.getType match {
case java.lang.Integer.TYPE => s"$fieldTerm.setInt($objectTerm, $valueTerm)"
case java.lang.Long.TYPE => s"$fieldTerm.setLong($objectTerm, $valueTerm)"
case java.lang.Short.TYPE => s"$fieldTerm.setShort($objectTerm, $valueTerm)"
case java.lang.Byte.TYPE => s"$fieldTerm.setByte($objectTerm, $valueTerm)"
case java.lang.Float.TYPE => s"$fieldTerm.setFloat($objectTerm, $valueTerm)"
case java.lang.Double.TYPE => s"$fieldTerm.setDouble($objectTerm, $valueTerm)"
case java.lang.Boolean.TYPE => s"$fieldTerm.setBoolean($objectTerm, $valueTerm)"
case java.lang.Character.TYPE => s"$fieldTerm.setChar($objectTerm, $valueTerm)"
case _ => s"$fieldTerm.set($objectTerm, $valueTerm)"
}
}

0 comments on commit dd412ea

Please sign in to comment.