Skip to content

Commit

Permalink
[FLINK-3226] Translation from and to POJOs for CodeGenerator
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Feb 11, 2016
1 parent fff25df commit 2322d9f
Show file tree
Hide file tree
Showing 9 changed files with 616 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,15 @@ class TableEnvironment {
TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
}

/**
* Converts the given [[org.apache.flink.api.table.Table]] to
* a DataSet. The given type must have exactly the same fields as the
* [[org.apache.flink.api.table.Table]]. That is, the names of the
* fields and the types must match.
*/
def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo)
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.flink.api.table.codegen

import java.lang.reflect.Field
import java.util.concurrent.atomic.AtomicInteger

import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.java.typeutils.{TypeExtractor, PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.table.typeinfo.RowTypeInfo

Expand Down Expand Up @@ -150,7 +151,11 @@ object CodeGenUtils {

sealed abstract class FieldAccessor

case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
case class ObjectFieldAccessor(field: Field) extends FieldAccessor

case class ObjectGenericFieldAccessor(name: String) extends FieldAccessor

case class ObjectPrivateFieldAccessor(field: Field) extends FieldAccessor

case class ObjectMethodAccessor(methodName: String) extends FieldAccessor

Expand All @@ -165,12 +170,56 @@ object CodeGenUtils {
ObjectMethodAccessor(cc.getFieldNames()(index))

case javaTup: TupleTypeInfo[_] =>
ObjectFieldAccessor("f" + index)
ObjectGenericFieldAccessor("f" + index)

case pj: PojoTypeInfo[_] =>
ObjectFieldAccessor(pj.getFieldNames()(index))
case pt: PojoTypeInfo[_] =>
val fieldName = pt.getFieldNames()(index)
getFieldAccessor(pt.getTypeClass, fieldName)

case _ => throw new CodeGenException("Unsupported composite type.")
}
}

def getFieldAccessor(clazz: Class[_], fieldName: String): FieldAccessor = {
val field = TypeExtractor.getDeclaredField(clazz, fieldName)
if (field.isAccessible) {
ObjectFieldAccessor(field)
}
else {
ObjectPrivateFieldAccessor(field)
}
}

def isFieldPrimitive(field: Field): Boolean = field.getType.isPrimitive

def reflectiveFieldReadAccess(fieldTerm: String, field: Field, objectTerm: String): String =
field.getType match {
case java.lang.Integer.TYPE => s"$fieldTerm.getInt($objectTerm)"
case java.lang.Long.TYPE => s"$fieldTerm.getLong($objectTerm)"
case java.lang.Short.TYPE => s"$fieldTerm.getShort($objectTerm)"
case java.lang.Byte.TYPE => s"$fieldTerm.getByte($objectTerm)"
case java.lang.Float.TYPE => s"$fieldTerm.getFloat($objectTerm)"
case java.lang.Double.TYPE => s"$fieldTerm.getDouble($objectTerm)"
case java.lang.Boolean.TYPE => s"$fieldTerm.getBoolean($objectTerm)"
case java.lang.Character.TYPE => s"$fieldTerm.getChar($objectTerm)"
case _ => s"(${field.getType.getCanonicalName}) $fieldTerm.get($objectTerm)"
}

def reflectiveFieldWriteAccess(
fieldTerm: String,
field: Field,
objectTerm: String,
valueTerm: String)
: String =
field.getType match {
case java.lang.Integer.TYPE => s"$fieldTerm.setInt($objectTerm, $valueTerm)"
case java.lang.Long.TYPE => s"$fieldTerm.setLong($objectTerm, $valueTerm)"
case java.lang.Short.TYPE => s"$fieldTerm.setShort($objectTerm, $valueTerm)"
case java.lang.Byte.TYPE => s"$fieldTerm.setByte($objectTerm, $valueTerm)"
case java.lang.Float.TYPE => s"$fieldTerm.setFloat($objectTerm, $valueTerm)"
case java.lang.Double.TYPE => s"$fieldTerm.setDouble($objectTerm, $valueTerm)"
case java.lang.Boolean.TYPE => s"$fieldTerm.setBoolean($objectTerm, $valueTerm)"
case java.lang.Character.TYPE => s"$fieldTerm.setChar($objectTerm, $valueTerm)"
case _ => s"$fieldTerm.set($objectTerm, $valueTerm)"
}
}

0 comments on commit 2322d9f

Please sign in to comment.