Skip to content

Commit

Permalink
Rewrite project to Scala 3. Still WIP, tests and API has not been fin…
Browse files Browse the repository at this point in the history
…alized
  • Loading branch information
jacobfi committed Feb 15, 2024
1 parent df0fe83 commit 7d79bf3
Show file tree
Hide file tree
Showing 28 changed files with 455 additions and 888 deletions.
10 changes: 4 additions & 6 deletions build.sbt
Expand Up @@ -2,15 +2,13 @@ organization := "com.audienceproject"

name := "crossbow"

version := "0.1.6"
version := "0.2.0"

scalaVersion := "2.13.6"
crossScalaVersions := Seq(scalaVersion.value, "2.12.12", "2.11.12")
scalaVersion := "3.3.1"

scalacOptions ++= Seq("-deprecation", "-feature", "-language:existentials")
scalacOptions ++= Seq("-deprecation", "-feature")

libraryDependencies += "org.scala-lang" % "scala-reflect" % scalaVersion.value
libraryDependencies += "org.scalatest" %% "scalatest-funsuite" % "3.2.0" % "test"
libraryDependencies += "org.scalatest" %% "scalatest-funsuite" % "3.2.17" % "test"

/**
* Maven specific settings for publishing to Maven central.
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
@@ -1 +1 @@
sbt.version = 1.6.2
sbt.version = 1.9.8
133 changes: 61 additions & 72 deletions src/main/scala/com/audienceproject/crossbow/DataFrame.scala
Expand Up @@ -2,13 +2,16 @@ package com.audienceproject.crossbow

import com.audienceproject.crossbow.algorithms.{GroupBy, SortMergeJoin}
import com.audienceproject.crossbow.exceptions.{IncorrectTypeException, JoinException}
import com.audienceproject.crossbow.expr._
import com.audienceproject.crossbow.expr.*
import com.audienceproject.crossbow.schema.{Column, Schema}

import scala.util.Sorting

class DataFrame private(private val columnData: Vector[Array[_]], val schema: Schema,
private val sortKey: Option[Expr] = None) {
class DataFrame private(
private val columnData: Vector[Array[_]], val schema: Schema,
private val sortKey: Option[Expr] = None) {

import expr.typeTag

val rowCount: Int = if (columnData.isEmpty) 0 else columnData.head.length
val numColumns: Int = columnData.size
Expand All @@ -17,30 +20,29 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
* Retrieve a single row by index.
*
* @param index row index
* @return row as a sequence of values
* @return row as a [[Tuple]]
*/
def apply(index: Int): Seq[Any] =
if (isEmpty) Seq.empty
else columnData.map(_ (index))
def apply(index: Int): Tuple =
columnData.foldRight(EmptyTuple: Tuple):
(data, tup) => data(index) *: tup

/**
* Retrieve a subset of rows from this DataFrame based on range of indices.
*
* @param range range of row indices to retrieve
* @return new DataFrame
*/
def apply(range: Range): DataFrame = slice(range, sortKey)
def apply(range: Range): DataFrame =
slice(range, sortKey)

/**
* Select a subset of columns from this DataFrame.
*
* @param columnNames names of columns to select
* @return new DataFrame
*/
def apply(columnNames: String*): DataFrame = {
val colExprs = columnNames.map(Expr.Cell)
select(colExprs: _*)
}
def apply(columnNames: String*): DataFrame =
select(columnNames.map(Expr.Cell.apply) *)

/**
* Typecast this DataFrame to a TypedView of the type parameter 'T'. All columns in this DataFrame will have to be
Expand All @@ -50,20 +52,11 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
* @tparam T the type of a row in this DataFrame
* @return [[TypedView]] on the contents of this DataFrame
*/
def as[T: ru.TypeTag]: TypedView[T] = {
val dataType = ru.typeOf[T]
schema.columns match {
case Seq() => throw new IncorrectTypeException(dataType, AnyType(ru.typeOf[Nothing]))
case Seq(col) =>
if (Types.typecheck(col.columnType, dataType))
new TypedColumnView[T](getColumnData(col.name).asInstanceOf[Array[T]])
else throw new IncorrectTypeException(dataType, col.columnType)
case cols =>
val schemaType = ProductType(cols.map(_.columnType): _*)
if (Types.typecheck(schemaType, dataType)) new TypedView[T]
else throw new IncorrectTypeException(dataType, schemaType)
}
}
def as[T <: Tuple : TypeTag]: TypedView[T] =
val schemaType = RuntimeType.Product(schema.columns.map(_.columnType): _*)
val expectedType = summon[TypeTag[T]].runtimeType
if (expectedType == schemaType) new TypedView[T]
else throw new IncorrectTypeException(expectedType, schemaType)

/**
* Add a column to the DataFrame, evaluating to 'expr' at each individual row index.
Expand All @@ -73,11 +66,10 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
* @return new DataFrame
*/
def addColumn(expr: Expr): DataFrame = {
val eval = expr.compile(this)
val newCol = sliceColumn(eval, 0 until rowCount)
val newCol = sliceColumn(expr.eval, expr.typeOf, 0 until rowCount)
val newColSchema = expr match {
case Expr.Named(columnName, _) => Column(columnName, eval.typeOf)
case _ => Column(s"_$numColumns", eval.typeOf)
case Expr.Named(columnName, _) => Column(columnName, expr.typeOf)
case _ => Column(s"_$numColumns", expr.typeOf)
}
new DataFrame(columnData :+ newCol, schema.add(newColSchema))
}
Expand All @@ -102,17 +94,17 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
* @param exprs the list of [[Expr]] to evaluate as a new DataFrame
* @return new DataFrame
*/
def select(exprs: Expr*): DataFrame = {
def select(exprs: DataFrame ?=> Expr*): DataFrame = {
given DataFrame = this
val (colData, colSchemas) = exprs.zipWithIndex.map({
case (Expr.Named(newName, Expr.Cell(colName)), _) => (getColumnData(colName), schema.get(colName).renamed(newName))
case (Expr.Cell(colName), _) => (getColumnData(colName), schema.get(colName))
case (expr, i) =>
val eval = expr.compile(this)
val newColSchema = expr match {
case Expr.Named(columnName, _) => new Column(columnName, eval.typeOf)
case _ => new Column(s"_$i", eval.typeOf)
case Expr.Named(columnName, _) => new Column(columnName, expr.typeOf)
case _ => new Column(s"_$i", expr.typeOf)
}
(sliceColumn(eval, 0 until rowCount), newColSchema)
(sliceColumn(expr.eval, expr.typeOf, 0 until rowCount), newColSchema)
}).unzip[Array[_], Column](p => (p._1, p._2))
new DataFrame(colData.toVector, Schema(colSchemas))
}
Expand All @@ -124,7 +116,7 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
* @return new DataFrame
*/
def filter(expr: Expr): DataFrame = {
val eval = expr.compile(this).typecheckAs[Boolean]
val eval = expr.typecheckAs[Boolean]
val indices = for (i <- 0 until rowCount if eval(i)) yield i
slice(indices, sortKey)
}
Expand All @@ -138,8 +130,8 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
* @return new DataFrame
*/
def explode(expr: Expr): DataFrame = {
val eval = expr.compile(this).typecheckAs[Seq[_]]
val ListType(innerType) = eval.typeOf // This unapply is safe due to typecheck.
val eval = expr.typecheckAs[Seq[_]]
val RuntimeType.List(innerType) = expr.typeOf: @unchecked // This unapply is safe due to typecheck.
val nestedCol = fillArray[Seq[_]](0 until rowCount, eval.apply)
val reps = nestedCol.map(_.size)
val colData = for (i <- 0 until numColumns) yield repeatColumn(columnData(i), schema.columns(i).columnType, reps)
Expand All @@ -158,7 +150,8 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
* @param keyExprs the list of [[com.audienceproject.crossbow.expr.Expr]] that will evaluate to the keys of the groups
* @return [[GroupedView]] on this DataFrame
*/
def groupBy(keyExprs: Expr*): GroupedView = new GroupedView(keyExprs)
def groupBy(keyExprs: DataFrame ?=> Expr*): GroupedView =
new GroupedView(keyExprs)

/**
* Sort this DataFrame by the evaluation of 'expr'. If a natural ordering exists on this value, it will be used.
Expand All @@ -173,19 +166,17 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
def sortBy(expr: Expr, givenOrderings: Seq[Order] = Seq.empty, stable: Boolean = false): DataFrame = {
if (sortKey.contains(expr) && givenOrderings.isEmpty) this
else {
val eval = expr.compile(this)
val ord = Order.getOrdering(eval.typeOf, givenOrderings)
val ord = Order.getOrdering(expr.typeOf, givenOrderings)
val indices = Array.tabulate(rowCount)(identity)
implicit val comparator: Ordering[Int] = new Ordering[Int] {
override def compare(x: Int, y: Int): Int = ord.compare(eval(x), eval(y))
}
given Ordering[Int] = (x: Int, y: Int) => ord.compare(expr.eval(x), expr.eval(y))
if (stable) Sorting.stableSort[Int](indices)
else Sorting.quickSort[Int](indices)
slice(indices.toIndexedSeq, if (givenOrderings.isEmpty) Some(expr) else None)
}
}

def sortBy(expr: Expr, givenOrderings: Order*): DataFrame = sortBy(expr, Seq(givenOrderings: _*))
def sortBy(expr: Expr, givenOrderings: Order*): DataFrame =
sortBy(expr, Seq(givenOrderings: _*))

/**
* Join this DataFrame on another DataFrame, with the key evaluated by 'joinExpr'.
Expand All @@ -198,11 +189,12 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
* @param joinType [[JoinType]] as one of Inner, FullOuter, LeftOuter or RightOuter
* @return new DataFrame
*/
def join(other: DataFrame, joinExpr: Expr, joinType: JoinType = JoinType.Inner): DataFrame = {
val internalType = joinExpr.compile(this).typeOf
if (internalType != joinExpr.compile(other).typeOf) throw new JoinException(joinExpr)
val ordering = Order.getOrdering(internalType)
SortMergeJoin(this.sortBy(joinExpr), other.sortBy(joinExpr), joinExpr, joinType, ordering)
def join(other: DataFrame, joinExpr: DataFrame ?=> Expr, joinType: JoinType = JoinType.Inner): DataFrame = {
val left = joinExpr(using this)
val right = joinExpr(using other)
if (left.typeOf != right.typeOf) throw new JoinException(left)
val ordering = Order.getOrdering(left.typeOf)
SortMergeJoin(this.sortBy(left), other.sortBy(right), joinExpr, joinType, ordering)
}

/**
Expand Down Expand Up @@ -263,18 +255,18 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc

private[crossbow] def slice(indices: IndexedSeq[Int], slicedSortKey: Option[Expr] = None): DataFrame = {
val newData = schema.columns.map(col => {
val eval = Expr.Cell(col.name).compile(this)
sliceColumn(eval, indices)
val oldData = getColumnData(col.name)
sliceColumn(oldData, col.columnType, indices)
})
new DataFrame(newData.toVector, schema, slicedSortKey)
}

private[crossbow] def getColumnData(columnName: String): Array[_] = {
private[crossbow] def getColumnData(columnName: String): Array[?] = {
val columnIndex = schema.indexOf(columnName)
columnData(columnIndex)
}

class GroupedView private[DataFrame](keyExprs: Seq[Expr]) {
class GroupedView private[DataFrame](keyExprs: Seq[DataFrame ?=> Expr]) {
/**
* Aggregate this GroupedView to a new DataFrame, evaluated by the list of aggregation expressions.
* An aggregation expression can be any expression, but it must contain [[Aggregator]] expressions instead
Expand All @@ -285,19 +277,19 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
* @param aggExprs the list of [[Expr]] used to aggregate the values of the groups
* @return new DataFrame
*/
def agg(aggExprs: Expr*): DataFrame = GroupBy(DataFrame.this, keyExprs, aggExprs)
def agg(aggExprs: DataFrame ?=> Expr*): DataFrame =
given DataFrame = DataFrame.this
GroupBy(DataFrame.this, keyExprs.map(x => x), aggExprs.map(x => x))
}

class TypedView[T] private[DataFrame]() extends Iterable[T] {
private implicit val t2Tuple: Seq[Any] => T = toTuple[T](numColumns)

class TypedView[T <: Tuple] private[DataFrame] extends Iterable[T] {
/**
* Retrieve a single row by index.
*
* @param index row index
* @return row as a value of type 'T'
*/
def apply(index: Int): T = DataFrame.this (index)
def apply(index: Int): T = DataFrame.this.apply(index).asInstanceOf[T]

/**
* Retrieve a subset of rows from this view based on range of indices.
Expand All @@ -312,18 +304,14 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
override def size: Int = rowCount
}

private class TypedColumnView[T] private[DataFrame](columnData: Array[T]) extends TypedView[T] {
override def apply(index: Int): T = columnData(index)
}

def isEmpty: Boolean = rowCount == 0

def iterator: Iterator[Seq[Any]] = new Iterator[Seq[Any]] {
def iterator: Iterator[Tuple] = new Iterator[Tuple] {
private var pos = -1

override def hasNext: Boolean = pos < rowCount - 1

override def next(): Seq[Any] = {
override def next(): Tuple = {
pos += 1
DataFrame.this (pos)
}
Expand All @@ -342,22 +330,23 @@ object DataFrame {
* @tparam T the type of a row, if this is a [[Product]] type each element will become a separate column
* @return new DataFrame
*/
def fromSeq[T: ru.TypeTag](data: Seq[T], columnNames: String*): DataFrame = {

val dataType = Types.toInternalType(ru.typeOf[T])
val df = dataType match {
case ProductType(elementTypes@_*) =>
def fromSeq[T: TypeTag](data: Seq[T], schema: Schema): DataFrame = {
val dataType = summon[TypeTag[T]].runtimeType
dataType match
case RuntimeType.Product(elementTypes*) =>
val tupleData = data.asInstanceOf[Seq[Product]]
val columnData = elementTypes.zipWithIndex.map({ case (t, i) => convert(tupleData.map(_.productElement(i)), t) })
val columnSchemas = elementTypes.zipWithIndex.map({ case (t, i) => Column(s"_$i", t) })
new DataFrame(columnData.toVector, Schema(columnSchemas.toList))
case _ =>
val col = convert(data, dataType)
new DataFrame(Vector(col), Schema(List(new Column("_0", dataType))))
}
if (columnNames.nonEmpty) df.renameColumns(columnNames: _*) else df
}

inline def fromSeq[T: TypeTag](data: Seq[T]): DataFrame =
val dataType = summon[TypeTag[T]].runtimeType
val schema = Schema(Seq.empty)
fromSeq(data, schema)

/**
* Construct a new DataFrame from a list of columns and a schema.
Expand Down
56 changes: 19 additions & 37 deletions src/main/scala/com/audienceproject/crossbow/Implicits.scala
@@ -1,62 +1,44 @@
package com.audienceproject.crossbow

import com.audienceproject.crossbow.expr.Aggregator.Reducer
import com.audienceproject.crossbow.expr._
import com.audienceproject.crossbow.exceptions.InvalidExpressionException
import com.audienceproject.crossbow.expr.*

import scala.language.implicitConversions

object Implicits {

import expr.typeTag

// Column expression.
implicit class ColumnByName(val sc: StringContext) extends AnyVal {
def $(args: Any*): Expr = Expr.Cell(sc.s(args: _*))
}
extension (sc: StringContext)
def $(args: Any*): DataFrame ?=> Expr = Expr.Cell(sc.s(args: _*))

// Literal value.
implicit def lit[T: ru.TypeTag](value: T): Expr = Expr.Literal(value)

// Tuples.
implicit def tuple2(t: (Expr, Expr)): Expr = Expr.Tuple(t._1, t._2)

implicit def tuple3(t: (Expr, Expr, Expr)): Expr = Expr.Tuple(t._1, t._2, t._3)

implicit def tuple4(t: (Expr, Expr, Expr, Expr)): Expr = Expr.Tuple(t._1, t._2, t._3, t._4)

implicit def tuple5(t: (Expr, Expr, Expr, Expr, Expr)): Expr = Expr.Tuple(t._1, t._2, t._3, t._4, t._5)

implicit def tuple6(t: (Expr, Expr, Expr, Expr, Expr, Expr)): Expr = Expr.Tuple(t._1, t._2, t._3, t._4, t._5, t._6)
implicit def lit[T](value: T): Expr = Expr.Literal(value)

// Index function.
def index(): Expr = Expr.Index()
def index(): DataFrame ?=> Expr = Expr.Index()

// Lambda function.
def lambda[T: ru.TypeTag, R: ru.TypeTag](f: T => R): Expr => Expr =
(expr: Expr) => Expr.Lambda(expr, f)
def lambda[T, R](f: T => R): Expr => Expr =
(expr: Expr) => Expr.Unary(expr, f)

// Sequence of values.
def seq(exprs: Expr*): Expr = Expr.List(exprs)

// Aggregators.
def sum(expr: Expr): Aggregator = Aggregator.Sum(expr)
def sum(expr: Expr): Expr = expr.typeOf match
case RuntimeType.Int => Expr.Aggregate[Int, Int](expr, _ + _, 0)
case RuntimeType.Long => Expr.Aggregate[Long, Long](expr, _ + _, 0L)
case RuntimeType.Double => Expr.Aggregate[Double, Double](expr, _ + _, 0d)
case _ => throw new InvalidExpressionException("sum", expr)

def count(expr: Expr): Aggregator = Aggregator.Count(expr)
def count(): Expr = Expr.Aggregate[Any, Int](lit(1), (_, x) => x + 1, 0)

def collect(expr: Expr): Aggregator = Aggregator.Collect(expr)
def collect(expr: Expr): Expr = Expr.Aggregate[Any, Seq[Any]](expr, (e, seq) => seq :+ e, Vector.empty)

def one(expr: Expr): Aggregator = Aggregator.OneOf(expr)
def one(expr: Expr): Expr = Expr.Aggregate[Any, Any](expr, (elem, _) => elem, null)

// Custom aggregator.
def reducer[T: ru.TypeTag, U: ru.TypeTag](seed: U)(f: (T, U) => U): Expr => Aggregator =
new Aggregator(_) {
override protected def typeSpec(op: Specialized[_]): Reducer[_, _] = {
val spec = op.typecheckAs[T]
Reducer[T, U](spec, f, seed, Types.toInternalType(ru.typeOf[U]))
}
}
implicit class SeqWrapper[T: ru.TypeTag](val seq:Seq[T]) {

def toDataFrame(columnNames:String*): DataFrame = {
DataFrame.fromSeq(seq,columnNames:_*)
}
}
def reducer[T, U](seed: U)(f: (T, U) => U): Expr => Expr = Expr.Aggregate[T, U](_, f, seed)
}

0 comments on commit 7d79bf3

Please sign in to comment.