Skip to content

Commit

Permalink
Implemented explode() operation. Optimized single column typed access…
Browse files Browse the repository at this point in the history
…. Permitted unspecialized columns in schema.
  • Loading branch information
Jacob Fischer committed Oct 1, 2020
1 parent 27bf941 commit 57f037c
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 12 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -2,7 +2,7 @@ organization := "com.audienceproject"

name := "crossbow"

version := "0.1.3"
version := "0.1.4"

scalaVersion := "2.13.3"
crossScalaVersions := Seq(scalaVersion.value, "2.12.12", "2.11.12")
Expand Down
51 changes: 42 additions & 9 deletions src/main/scala/com/audienceproject/crossbow/DataFrame.scala
Expand Up @@ -52,12 +52,17 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
*/
def as[T: ru.TypeTag]: TypedView[T] = {
val dataType = ru.typeOf[T]
if (numColumns == 0) throw new IncorrectTypeException(dataType, AnyType(ru.typeOf[Nothing]))
val schemaType =
if (numColumns == 1) schema.columns.head.columnType
else ProductType(schema.columns.map(_.columnType): _*)
if (Types.typecheck(schemaType, dataType)) new TypedView[T]
else throw new IncorrectTypeException(dataType, schemaType)
schema.columns match {
case Seq() => throw new IncorrectTypeException(dataType, AnyType(ru.typeOf[Nothing]))
case Seq(col) =>
if (Types.typecheck(col.columnType, dataType))
new TypedColumnView[T](getColumnData(col.name).asInstanceOf[Array[T]])
else throw new IncorrectTypeException(dataType, col.columnType)
case cols =>
val schemaType = ProductType(cols.map(_.columnType): _*)
if (Types.typecheck(schemaType, dataType)) new TypedView[T]
else throw new IncorrectTypeException(dataType, schemaType)
}
}

/**
Expand Down Expand Up @@ -124,11 +129,33 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
slice(indices, sortKey)
}

/**
* Explode this DataFrame on the given expression, flattening its contents and repeating all other cells on the
* row for every element in the sequence. The given [[Expr]] must evaluate to a list type.
* Use the 'as' method on [[Expr]] to name the flattened column.
*
* @param expr the [[Expr]] to explode on
* @return new DataFrame
*/
def explode(expr: Expr): DataFrame = {
val eval = expr.compile(this).typecheckAs[Seq[_]]
val ListType(innerType) = eval.typeOf // This unapply is safe due to typecheck.
val nestedCol = fillArray[Seq[_]](0 until rowCount, eval.apply)
val reps = nestedCol.map(_.size)
val colData = for (i <- 0 until numColumns) yield repeatColumn(columnData(i), schema.columns(i).columnType, reps)
val explodedColSchema = expr match {
case Expr.Named(newName, _) => Column(newName, innerType)
case _ => Column(s"_$numColumns", innerType)
}
val explodedData = convert(nestedCol.toSeq.flatten, innerType)
new DataFrame(colData.toVector :+ explodedData, Schema(schema.columns :+ explodedColSchema))
}

/**
* Partition this DataFrame into groups, defined by the given set of expressions.
* The evaluation of each of the 'keyExprs' will appear as a column in the output.
*
* @param keyExprs the list of [[Expr]] that will evaluate to the keys of the groups
* @param keyExprs the list of [[com.audienceproject.crossbow.expr.Expr]] that will evaluate to the keys of the groups
* @return [[GroupedView]] on this DataFrame
*/
def groupBy(keyExprs: Expr*): GroupedView = new GroupedView(keyExprs)
Expand Down Expand Up @@ -280,6 +307,10 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
override def size: Int = rowCount
}

private class TypedColumnView[T] private[DataFrame](columnData: Array[T]) extends TypedView[T] {
override def apply(index: Int): T = columnData(index)
}

def isEmpty: Boolean = rowCount == 0

def iterator: Iterator[Seq[Any]] = new Iterator[Seq[Any]] {
Expand All @@ -297,6 +328,8 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc

object DataFrame {

def empty(): DataFrame = new DataFrame(Vector.empty, Schema())

/**
* Construct a new DataFrame from a sequence of rows.
*
Expand All @@ -305,7 +338,7 @@ object DataFrame {
* @return new DataFrame
*/
def fromSeq[T: ru.TypeTag](data: Seq[T]): DataFrame = {
if (data.isEmpty) new DataFrame(Vector.empty, Schema())
if (data.isEmpty) empty()
else {
val dataType = Types.toInternalType(ru.typeOf[T])
dataType match {
Expand Down Expand Up @@ -344,7 +377,7 @@ object DataFrame {
* @return new DataFrame
*/
def unionAll(dataFrames: Seq[DataFrame]): DataFrame = {
if (dataFrames.isEmpty) new DataFrame(Vector.empty, Schema())
if (dataFrames.isEmpty) empty()
else {
val schema = dataFrames.head.schema
if (!dataFrames.tail.forall(_.schema == schema))
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/com/audienceproject/crossbow/Implicits.scala
Expand Up @@ -26,6 +26,9 @@ object Implicits {

implicit def tuple6(t: (Expr, Expr, Expr, Expr, Expr, Expr)): Expr = Expr.Tuple(t._1, t._2, t._3, t._4, t._5, t._6)

// Index function.
def index(): Expr = Expr.Index()

// Lambda function.
def lambda[T: ru.TypeTag, R: ru.TypeTag](f: T => R): Expr => Expr =
(expr: Expr) => Expr.Lambda(expr, f)
Expand Down
Expand Up @@ -25,7 +25,7 @@ private[crossbow] object Traversal {
case BinaryExpr(lhs, rhs) => dfs(lhs :: rhs :: tail, result)
case UnaryExpr(expr) => dfs(expr :: tail, result)
case Aggregator(expr) => dfs(expr :: tail, result)
case Expr.Named(name, expr) => dfs(expr :: tail, result)
case Expr.Named(_, expr) => dfs(expr :: tail, result)
case Expr.Tuple(exprs@_*) => dfs(exprs.toList ++ tail, result)
case Expr.List(exprs) => dfs(exprs.toList ++ tail, result)
case lambda: Expr.Lambda[_, _] => dfs(lambda.expr :: tail, result)
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/com/audienceproject/crossbow/expr/Expr.scala
Expand Up @@ -38,6 +38,12 @@ private[crossbow] object Expr {
override def toString: String = columnName
}

case class Index() extends Expr {
override private[crossbow] def compile(context: DataFrame) = {
specialize[Int](identity)
}
}

case class Literal[T: ru.TypeTag](value: T) extends Expr {
override private[crossbow] def compile(context: DataFrame) = specialize[T](_ => value)
}
Expand Down
21 changes: 21 additions & 0 deletions src/main/scala/com/audienceproject/crossbow/package.scala
Expand Up @@ -86,6 +86,27 @@ package object crossbow {
arr
}

private[crossbow] def repeatColumn(data: Array[_], ofType: Type, reps: Array[Int]): Array[_] = {
ofType match {
case IntType => fillRepeatArray[Int](data.asInstanceOf[Array[Int]], reps)
case LongType => fillRepeatArray[Long](data.asInstanceOf[Array[Long]], reps)
case DoubleType => fillRepeatArray[Double](data.asInstanceOf[Array[Double]], reps)
case BooleanType => fillRepeatArray[Boolean](data.asInstanceOf[Array[Boolean]], reps)
case _ => fillRepeatArray[Any](data.asInstanceOf[Array[Any]], reps)
}
}

private[crossbow] def fillRepeatArray[T: ClassTag](data: Array[T], reps: Array[Int]): Array[T] = {
val arr = new Array[T](reps.sum)
reps.indices.foldLeft(0) {
case (offset, i) =>
val next = offset + reps(i)
for (j <- offset until next) arr(j) = data(i)
next
}
arr
}

private[crossbow] def convert(data: Seq[Any], dataType: Type): Array[_] = {
dataType match {
case IntType => data.asInstanceOf[Seq[Int]].toArray
Expand Down
@@ -1,6 +1,6 @@
package com.audienceproject.crossbow.schema

import com.audienceproject.crossbow.expr.{Type, Types, ru}
import com.audienceproject.crossbow.expr.{AnyType, Type, Types, ru}

case class Column private[crossbow](name: String, columnType: Type) {

Expand All @@ -15,4 +15,7 @@ object Column {
def apply[T: ru.TypeTag](columnName: String): Column =
Column(columnName, Types.toInternalType(ru.typeOf[T]))

def unspecialized[T: ru.TypeTag](columnName: String): Column =
Column(columnName, AnyType(ru.typeOf[T]))

}
31 changes: 31 additions & 0 deletions src/test/scala/com/audienceproject/crossbow/core/ExplodeTest.scala
@@ -0,0 +1,31 @@
package com.audienceproject.crossbow.core

import com.audienceproject.crossbow.DataFrame
import com.audienceproject.crossbow.Implicits._
import org.scalatest.funsuite.AnyFunSuite

class ExplodeTest extends AnyFunSuite {

test("Explode int seq column") {
val df = DataFrame.fromSeq(Seq(("a", Seq(1, 2)), ("b", Seq(1, 2, 3)), ("c", Seq(5, 10))))
val result = df.explode($"_1")
val expected = Seq(("a", 1), ("a", 2), ("b", 1), ("b", 2), ("b", 3), ("c", 5), ("c", 10))
assert(result("_0", "_2").as[(String, Int)].toSeq == expected)
}

test("Explode on evaluated expression") {
val df = DataFrame.fromSeq(Seq(("a", 2), ("b", 3), ("c", 4)))
val range = lambda[(Int, Int), Seq[Int]] { case (from, to) => from until to }
val result = df.explode(range((index(), $"_1")))
val expected = Seq(("a", 0), ("a", 1), ("b", 1), ("b", 2), ("c", 2), ("c", 3))
assert(result("_0", "_2").as[(String, Int)].toSeq == expected)
}

test("Explode on object column") {
val df = DataFrame.fromSeq(Seq(("a", Seq(Left(1), Right(2))), ("b", Seq(Left(2), Right(3)))))
val result = df.explode($"_1" as "either")
val expected = Seq(("a", Left(1)), ("a", Right(2)), ("b", Left(2)), ("b", Right(3)))
assert(result("_0", "either").as[(String, Any)].toSeq == expected)
}

}

0 comments on commit 57f037c

Please sign in to comment.