Skip to content

Commit

Permalink
add some sugar and also make 2.11 compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
Johs Kristoffersen committed Feb 2, 2021
1 parent 86bfe4e commit 4cc3453
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
33 changes: 18 additions & 15 deletions src/main/scala/com/audienceproject/crossbow/DataFrame.scala
Expand Up @@ -175,7 +175,9 @@ class DataFrame private(private val columnData: Vector[Array[_]], val schema: Sc
val eval = expr.compile(this)
val ord = Order.getOrdering(eval.typeOf, givenOrderings)
val indices = Array.tabulate(rowCount)(identity)
Sorting.quickSort[Int](indices)((x: Int, y: Int) => ord.compare(eval(x), eval(y)))
Sorting.quickSort[Int](indices)(new Ordering[Int] {
override def compare(x: Int, y: Int): Int = ord.compare(eval(x), eval(y))
})
slice(indices.toIndexedSeq, if (givenOrderings.isEmpty) Some(expr) else None)
}
}
Expand Down Expand Up @@ -335,22 +337,23 @@ object DataFrame {
* @tparam T the type of a row, if this is a [[Product]] type each element will become a separate column
* @return new DataFrame
*/
def fromSeq[T: ru.TypeTag](data: Seq[T]): DataFrame = {

val dataType = Types.toInternalType(ru.typeOf[T])
dataType match {
case ProductType(elementTypes@_*) =>
val tupleData = data.asInstanceOf[Seq[Product]]
val columnData = elementTypes.zipWithIndex.map({ case (t, i) => convert(tupleData.map(_.productElement(i)), t) })
val columnSchemas = elementTypes.zipWithIndex.map({ case (t, i) => Column(s"_$i", t) })
new DataFrame(columnData.toVector, Schema(columnSchemas.toList))
case _ =>
val col = convert(data, dataType)
new DataFrame(Vector(col), Schema(List(new Column("_0", dataType))))
}

def fromSeq[T: ru.TypeTag](data: Seq[T], columnNames: String*): DataFrame = {

val dataType = Types.toInternalType(ru.typeOf[T])
val df = dataType match {
case ProductType(elementTypes@_*) =>
val tupleData = data.asInstanceOf[Seq[Product]]
val columnData = elementTypes.zipWithIndex.map({ case (t, i) => convert(tupleData.map(_.productElement(i)), t) })
val columnSchemas = elementTypes.zipWithIndex.map({ case (t, i) => Column(s"_$i", t) })
new DataFrame(columnData.toVector, Schema(columnSchemas.toList))
case _ =>
val col = convert(data, dataType)
new DataFrame(Vector(col), Schema(List(new Column("_0", dataType))))
}
if (columnNames.nonEmpty) df.renameColumns(columnNames:_*) else df
}


/**
* Construct a new DataFrame from a list of columns and a schema.
*
Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/com/audienceproject/crossbow/Implicits.scala
Expand Up @@ -53,5 +53,9 @@ object Implicits {
Reducer[T, U](spec, f, seed, Types.toInternalType(ru.typeOf[U]))
}
}

implicit class SeqWrapper[T: ru.TypeTag](val seq:Seq[T]) {
def toDataframe(columnNames:String*): DataFrame = {
DataFrame.fromSeq(seq,columnNames:_*)
}
}
}
Expand Up @@ -10,4 +10,8 @@ class ConstuctionTest extends AnyFunSuite {
assertResult(2)(df.numColumns)
assertResult(Schema(Seq(Column("_0",expr.IntType),Column("_1",expr.LongType))))(df.schema)
}
test("construct using implicits") {
import com.audienceproject.crossbow.Implicits._
val df = Seq.empty[(Int,Long)].toDataframe()
}
}

0 comments on commit 4cc3453

Please sign in to comment.