Skip to content

Commit

Permalink
[FLINK-3226] Translation from and to POJOs for CodeGenerator
Browse files Browse the repository at this point in the history
This closes #1624
  • Loading branch information
twalthr authored and vasia committed Mar 18, 2016
1 parent f89d303 commit 7a46bfa
Show file tree
Hide file tree
Showing 11 changed files with 648 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,27 @@ class TableEnvironment {

/**
* Converts the given [[org.apache.flink.api.table.Table]] to
* a DataSet. The given type must have exactly the same fields as the
* [[org.apache.flink.api.table.Table]]. That is, the names of the
* fields and the types must match.
* a DataSet. The given type must have exactly the same field types and field order as the
* [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
* POJO types require name equivalence to be mapped correctly as their fields do not have
* an order.
*/
@SuppressWarnings(Array("unchecked"))
def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
new JavaBatchTranslator(config).translate[T](table.relNode)(
TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
}

/**
* Converts the given [[org.apache.flink.api.table.Table]] to
* a DataSet. The given type must have exactly the same field types and field order as the
* [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
* POJO types require name equivalence to be mapped correctly as their fields do not have
* an order.
*/
def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo)
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ class TableEnvironment {

/**
* Converts the given [[org.apache.flink.api.table.Table]] to
* a DataSet. The given type must have exactly the same fields as the
* [[org.apache.flink.api.table.Table]]. That is, the names of the
* fields and the types must match.
* a DataSet. The given type must have exactly the same field types and field order as the
* [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
* POJO types require name equivalence to be mapped correctly as their fields do not have
* an order.
*/
def toDataSet[T: TypeInformation](table: Table): DataSet[T] = {
new ScalaBatchTranslator(config).translate[T](table.relNode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.flink.api.table.codegen

import java.lang.reflect.Field
import java.util.concurrent.atomic.AtomicInteger

import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.java.typeutils.{TypeExtractor, PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.table.typeinfo.RowTypeInfo

Expand Down Expand Up @@ -150,7 +151,11 @@ object CodeGenUtils {

sealed abstract class FieldAccessor

case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
case class ObjectFieldAccessor(field: Field) extends FieldAccessor

case class ObjectGenericFieldAccessor(name: String) extends FieldAccessor

case class ObjectPrivateFieldAccessor(field: Field) extends FieldAccessor

case class ObjectMethodAccessor(methodName: String) extends FieldAccessor

Expand All @@ -165,12 +170,56 @@ object CodeGenUtils {
ObjectMethodAccessor(cc.getFieldNames()(index))

case javaTup: TupleTypeInfo[_] =>
ObjectFieldAccessor("f" + index)
ObjectGenericFieldAccessor("f" + index)

case pj: PojoTypeInfo[_] =>
ObjectFieldAccessor(pj.getFieldNames()(index))
case pt: PojoTypeInfo[_] =>
val fieldName = pt.getFieldNames()(index)
getFieldAccessor(pt.getTypeClass, fieldName)

case _ => throw new CodeGenException("Unsupported composite type.")
}
}

def getFieldAccessor(clazz: Class[_], fieldName: String): FieldAccessor = {
val field = TypeExtractor.getDeclaredField(clazz, fieldName)
if (field.isAccessible) {
ObjectFieldAccessor(field)
}
else {
ObjectPrivateFieldAccessor(field)
}
}

def isFieldPrimitive(field: Field): Boolean = field.getType.isPrimitive

def reflectiveFieldReadAccess(fieldTerm: String, field: Field, objectTerm: String): String =
field.getType match {
case java.lang.Integer.TYPE => s"$fieldTerm.getInt($objectTerm)"
case java.lang.Long.TYPE => s"$fieldTerm.getLong($objectTerm)"
case java.lang.Short.TYPE => s"$fieldTerm.getShort($objectTerm)"
case java.lang.Byte.TYPE => s"$fieldTerm.getByte($objectTerm)"
case java.lang.Float.TYPE => s"$fieldTerm.getFloat($objectTerm)"
case java.lang.Double.TYPE => s"$fieldTerm.getDouble($objectTerm)"
case java.lang.Boolean.TYPE => s"$fieldTerm.getBoolean($objectTerm)"
case java.lang.Character.TYPE => s"$fieldTerm.getChar($objectTerm)"
case _ => s"(${field.getType.getCanonicalName}) $fieldTerm.get($objectTerm)"
}

def reflectiveFieldWriteAccess(
fieldTerm: String,
field: Field,
objectTerm: String,
valueTerm: String)
: String =
field.getType match {
case java.lang.Integer.TYPE => s"$fieldTerm.setInt($objectTerm, $valueTerm)"
case java.lang.Long.TYPE => s"$fieldTerm.setLong($objectTerm, $valueTerm)"
case java.lang.Short.TYPE => s"$fieldTerm.setShort($objectTerm, $valueTerm)"
case java.lang.Byte.TYPE => s"$fieldTerm.setByte($objectTerm, $valueTerm)"
case java.lang.Float.TYPE => s"$fieldTerm.setFloat($objectTerm, $valueTerm)"
case java.lang.Double.TYPE => s"$fieldTerm.setDouble($objectTerm, $valueTerm)"
case java.lang.Boolean.TYPE => s"$fieldTerm.setBoolean($objectTerm, $valueTerm)"
case java.lang.Character.TYPE => s"$fieldTerm.setChar($objectTerm, $valueTerm)"
case _ => s"$fieldTerm.set($objectTerm, $valueTerm)"
}
}

0 comments on commit 7a46bfa

Please sign in to comment.