Skip to content

Commit

Permalink
Typed Parquet Tuple twitter#1198
Browse files Browse the repository at this point in the history
   *Macro support nested group type
   *Add r/w support nested case classes
   *Tests + refacto
  • Loading branch information
JiJiTang committed Feb 24, 2015
1 parent e602c86 commit 9abf725
Show file tree
Hide file tree
Showing 12 changed files with 913 additions and 233 deletions.
@@ -0,0 +1,110 @@
package com.twitter.scalding.parquet.tuple

import _root_.parquet.filter2.predicate.FilterPredicate
import cascading.scheme.Scheme
import com.twitter.scalding._
import com.twitter.scalding.parquet.HasFilterPredicate
import com.twitter.scalding.parquet.tuple.scheme.{ ParquetWriteSupport, ParquetReadSupport, TypedParquetTupleScheme }

import scala.reflect.ClassTag

/**
* Typed parquet tuple
* @author Jian Tang
*/
object TypedParquet {
/**
* Create readable typed parquet source.
* Here is an example:
*
* case class SampleClassB(string: String, int: Int, double: Option[Double], a: SampleClassA)
*
* class ReadSupport extends ParquetReadSupport[SampleClassB] {
* import com.twitter.scalding.parquet.tuple.macros.Macros._
* override val tupleConverter: ParquetTupleConverter = caseClassParquetTupleConverter[SampleClassB]
* override val rootSchema: String = caseClassParquetSchema[SampleClassB]
* }
*
* val parquetTuple = TypedParquet[SampleClassB, ReadSupport](Seq(outputPath))
*
* @param paths paths of parquet I/O
* @param t Read support type tag
* @tparam T Tuple type
* @tparam R Read support type
* @return a typed parquet source.
*/
def apply[T, R <: ParquetReadSupport[T]](paths: Seq[String])(implicit t: ClassTag[R]) =
new TypedFixedPathParquetTuple[T, R, ParquetWriteSupport[T]](paths, t.runtimeClass.asInstanceOf[Class[R]], null)

/**
* Create readable typed parquet source with filter predicate.
*/
def apply[T, R <: ParquetReadSupport[T]](paths: Seq[String], fp: Option[FilterPredicate])(implicit t: ClassTag[R]) =
new TypedFixedPathParquetTuple[T, R, ParquetWriteSupport[T]](paths, t.runtimeClass.asInstanceOf[Class[R]], null) {
override def withFilter = fp
}

/**
* Create typed parquet source supports both R/W.
* @param paths paths of parquet I/O
* @param r Read support type tag
* @param w Write support type tag
* @tparam T Tuple type
* @tparam R Read support type
* @return a typed parquet source.
*/
def apply[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]](paths: Seq[String])(implicit r: ClassTag[R],
w: ClassTag[W]) = {
val readSupport = r.runtimeClass.asInstanceOf[Class[R]]
val writeSupport = w.runtimeClass.asInstanceOf[Class[W]]
new TypedFixedPathParquetTuple[T, R, W](paths, readSupport, writeSupport)
}

}

object TypedParquetSink {
/**
* Create typed parquet sink.
* Here is an example:
*
* case class SampleClassB(string: String, int: Int, double: Option[Double], a: SampleClassA)
*
* class WriteSupport extends ParquetWriteSupport[SampleClassB] {
* import com.twitter.scalding.parquet.tuple.macros.Macros._
* override val fieldValues: (SampleClassB) => Map[Int, Any] = caseClassFieldValues[SampleClassB]
* override val rootSchema: String = caseClassParquetSchema[SampleClassB]
* }
*
* val sink = TypedParquetSink[SampleClassB, WriteSupport](Seq(outputPath))
*
* @param paths paths of parquet I/O
* @param t Read support type tag
* @tparam T Tuple type
* @tparam W Write support type
* @return a typed parquet source.
*/
def apply[T, W <: ParquetWriteSupport[T]](paths: Seq[String])(implicit t: ClassTag[W]) =
new TypedFixedPathParquetTuple[T, ParquetReadSupport[T], W](paths, null, t.runtimeClass.asInstanceOf[Class[W]])
}

/**
* Typed Parquet tuple source/sink.
*/
trait TypedParquet[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]] extends FileSource with Mappable[T]
with TypedSink[T] with HasFilterPredicate {

val readSupport: Class[R]
val writeSupport: Class[W]

override def converter[U >: T] = TupleConverter.asSuperConverter[T, U](TupleConverter.singleConverter[T])

override def setter[U <: T] = TupleSetter.asSubSetter[T, U](TupleSetter.singleSetter[T])

override def hdfsScheme = {
val scheme = new TypedParquetTupleScheme[T](readSupport, writeSupport, withFilter)
HadoopSchemeInstance(scheme.asInstanceOf[Scheme[_, _, _, _, _]])
}
}

class TypedFixedPathParquetTuple[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]](val paths: Seq[String],
val readSupport: Class[R], val writeSupport: Class[W]) extends FixedPathSource(paths: _*) with TypedParquet[T, R, W]

This file was deleted.

This file was deleted.

@@ -1,11 +1,61 @@
package com.twitter.scalding.parquet.tuple.macros

import parquet.schema.MessageType
import com.twitter.scalding.parquet.tuple.macros.impl.{ ParquetTupleConverterProvider, ParquetSchemaProvider, FieldValuesProvider }
import com.twitter.scalding.parquet.tuple.scheme.ParquetTupleConverter

import scala.language.experimental.macros

import com.twitter.scalding.parquet.tuple.macros.impl.SchemaProviderImpl

object Macros {
def caseClassParquetSchema[T]: MessageType = macro SchemaProviderImpl.toParquetSchemaImp[T]
/**
* Macro used to generate parquet schema for a given case class that contains only primitive fields.
* Option field and nested group is supported. For example if we have:
*
* case class SampleClassA(x: Int, y: String)
* case class SampleClassB(a: SampleClassA, y: String)
*
* The macro will generate a parquet message type like this:
*
* """
* message SampleClassB {
* required group a {
* required int32 x;
* required binary y;
* }
* required binary y;
* }
* """
*
* @tparam T Case class type that contains only primitive fields or nested case class.
* @return Generated case class parquet message type string
*/
def caseClassParquetSchema[T]: String = macro ParquetSchemaProvider.toParquetSchemaImpl[T]

/**
* Macro used to generate function which permits to flat(at every level) a record to a index-value map.
* For example if we have:
*
* case class SampleClassA(short: Short, int: Int)
* case class SampleClassB(bool: Boolean, a: SampleClassA, long: Long, float: Float)
*
* val b = SampleClassB(true, SampleClassA(1, 4), 6L, 5F)
*
* After flatting using the generated function , we will get a map like this:
*
* Map(0 -> true, 1 -> 1, 2 -> 4, 3 -> 6L, 4 -> 5F)
* This macro can be used to define [[com.twitter.scalding.parquet.tuple.scheme.ParquetWriteSupport]].
* See this class for more details.
*
* @tparam T Case class type that contains only primitive fields or nested case class.
* @return Case class record field values flat function
*/
def caseClassFieldValues[T]: T => Map[Int, Any] = macro FieldValuesProvider.toFieldValuesImpl[T]

/**
* Macro used to generate parquet tuple converter for a given case class that contains only primitive fields.
* Option field and nested group is supported.
*
* @tparam T Case class type that contains only primitive fields or nested case class.
* @return Generated parquet converter
*/
def caseClassParquetTupleConverter[T]: ParquetTupleConverter = macro ParquetTupleConverterProvider.toParquetTupleConverterImpl[T]
}
@@ -0,0 +1,76 @@
package com.twitter.scalding.parquet.tuple.macros.impl

import com.twitter.bijection.macros.impl.IsCaseClassImpl

import scala.reflect.macros.Context

object FieldValuesProvider {

def toFieldValuesImpl[T](ctx: Context)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[T => Map[Int, Any]] = {
import ctx.universe._

if (!IsCaseClassImpl.isCaseClassType(ctx)(T.tpe))
ctx.abort(ctx.enclosingPosition,
s"""We cannot enforce ${T.tpe} is a case class,
either it is not a case class or this macro call is possibly enclosed in a class.
This will mean the macro is operating on a non-resolved type.""")

def matchField(idx: Int, fieldType: Type, pTree: Tree): (Int, Tree) = {
def appendFieldValue(idx: Int): (Int, Tree) =
(idx + 1, q"""if($pTree != null) fieldValueMap += $idx -> $pTree""")

fieldType match {
case tpe if tpe =:= typeOf[String] ||
tpe =:= typeOf[Boolean] ||
tpe =:= typeOf[Short] ||
tpe =:= typeOf[Int] ||
tpe =:= typeOf[Long] ||
tpe =:= typeOf[Float] ||
tpe =:= typeOf[Double] =>
appendFieldValue(idx)

case tpe if tpe.erasure =:= typeOf[Option[Any]] =>
val cacheName = newTermName(ctx.fresh(s"optionIndex"))
val innerType = tpe.asInstanceOf[TypeRefApi].args.head
val (newIdx, subTree) = matchField(idx, innerType, q"$cacheName")
(newIdx, q"""
if($pTree.isDefined) {
val $cacheName = $pTree.get
$subTree
}
""")

case tpe if IsCaseClassImpl.isCaseClassType(ctx)(tpe) => expandMethod(idx, tpe, pTree)
case _ => ctx.abort(ctx.enclosingPosition, s"Case class $T is not pure primitives or nested case classes")
}
}

def expandMethod(parentIdx: Int, outerTpe: Type, pTree: Tree): (Int, Tree) = {
outerTpe
.declarations
.collect { case m: MethodSymbol if m.isCaseAccessor => m }
.foldLeft((parentIdx, q"")) {
case ((idx, existingTree), accessorMethod) =>
val (newIdx, subTree) = matchField(idx, accessorMethod.returnType, q"""$pTree.$accessorMethod""")
(newIdx, q"""
$existingTree
$subTree""")
}
}

val (finalIdx, allFieldValues) = expandMethod(0, T.tpe, q"t")

if (finalIdx == 0)
ctx.abort(ctx.enclosingPosition, "Didn't consume any elements in the tuple, possibly empty case class?")

val fieldValues = q"""
val values: $T => _root_.scala.collection.immutable.Map[Int, Any] = t => {
var fieldValueMap = _root_.scala.collection.immutable.Map[Int, Any]()
$allFieldValues
fieldValueMap
}
values
"""
ctx.Expr[T => Map[Int, Any]](fieldValues)
}
}
@@ -0,0 +1,69 @@
package com.twitter.scalding.parquet.tuple.macros.impl

import com.twitter.bijection.macros.impl.IsCaseClassImpl

import scala.reflect.macros.Context

object ParquetSchemaProvider {
def toParquetSchemaImpl[T](c: Context)(implicit T: c.WeakTypeTag[T]): c.Expr[String] = {
import c.universe._

if (!IsCaseClassImpl.isCaseClassType(c)(T.tpe))
c.abort(c.enclosingPosition, s"""We cannot enforce ${T.tpe} is a case class, either it is not a case class or this macro call is possibly enclosed in a class.
This will mean the macro is operating on a non-resolved type.""")

def matchField(fieldType: Type, fieldName: String, isOption: Boolean): List[Tree] = {
val REPETITION_REQUIRED = q"_root_.parquet.schema.Type.Repetition.REQUIRED"
val REPETITION_OPTIONAL = q"_root_.parquet.schema.Type.Repetition.OPTIONAL"

def repetition: Tree = if (isOption) REPETITION_OPTIONAL else REPETITION_REQUIRED

def createPrimitiveTypeField(primitiveType: Tree): List[Tree] =
List(q"""new _root_.parquet.schema.PrimitiveType($repetition, $primitiveType, $fieldName)""")

fieldType match {
case tpe if tpe =:= typeOf[String] =>
createPrimitiveTypeField(q"_root_.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY")
case tpe if tpe =:= typeOf[Boolean] =>
createPrimitiveTypeField(q"_root_.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN")
case tpe if tpe =:= typeOf[Short] || tpe =:= typeOf[Int] =>
createPrimitiveTypeField(q"_root_.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32")
case tpe if tpe =:= typeOf[Long] =>
createPrimitiveTypeField(q"_root_.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64")
case tpe if tpe =:= typeOf[Float] =>
createPrimitiveTypeField(q"_root_.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT")
case tpe if tpe =:= typeOf[Double] =>
createPrimitiveTypeField(q"_root_.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE")
case tpe if tpe.erasure =:= typeOf[Option[Any]] =>
val innerType = tpe.asInstanceOf[TypeRefApi].args.head
matchField(innerType, fieldName, isOption = true)
case tpe if IsCaseClassImpl.isCaseClassType(c)(tpe) =>
List(q"""new _root_.parquet.schema.GroupType($repetition, $fieldName,
_root_.scala.Array.apply[_root_.parquet.schema.Type](..${expandMethod(tpe)}):_*)""")
case _ => c.abort(c.enclosingPosition, s"Case class $T is not pure primitives or nested case classes")
}
}

def expandMethod(outerTpe: Type): List[Tree] = {
outerTpe
.declarations
.collect { case m: MethodSymbol if m.isCaseAccessor => m }
.flatMap { accessorMethod =>
val fieldName = accessorMethod.name.toTermName.toString
val fieldType = accessorMethod.returnType
matchField(fieldType, fieldName, isOption = false)
}.toList
}

val expanded = expandMethod(T.tpe)

if (expanded.isEmpty)
c.abort(c.enclosingPosition, s"Case class $T.tpe has no primitive types we were able to extract")

val messageTypeName = s"${T.tpe}".split("\\.").last
val schema = q"""new _root_.parquet.schema.MessageType($messageTypeName,
_root_.scala.Array.apply[_root_.parquet.schema.Type](..$expanded):_*).toString"""

c.Expr[String](schema)
}
}

0 comments on commit 9abf725

Please sign in to comment.