Skip to content

Commit

Permalink
Merge pull request #1 from audienceproject/feature/allowColumnsWithTy…
Browse files Browse the repository at this point in the history
…pesOnEmptyDataframes

Allow columns with types on empty dataframes
  • Loading branch information
jacobfi committed Feb 3, 2021
2 parents 57f037c + c518bd5 commit b62bcf7
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 14 deletions.
28 changes: 14 additions & 14 deletions src/main/scala/com/audienceproject/crossbow/DataFrame.scala
Expand Up @@ -337,23 +337,23 @@ object DataFrame {
* @tparam T the type of a row, if this is a [[Product]] type each element will become a separate column
* @return new DataFrame
*/
def fromSeq[T: ru.TypeTag](data: Seq[T]): DataFrame = {
if (data.isEmpty) empty()
else {
val dataType = Types.toInternalType(ru.typeOf[T])
dataType match {
case ProductType(elementTypes@_*) =>
val tupleData = data.asInstanceOf[Seq[Product]]
val columnData = elementTypes.zipWithIndex.map({ case (t, i) => convert(tupleData.map(_.productElement(i)), t) })
val columnSchemas = elementTypes.zipWithIndex.map({ case (t, i) => Column(s"_$i", t) })
new DataFrame(columnData.toVector, Schema(columnSchemas.toList))
case _ =>
val col = convert(data, dataType)
new DataFrame(Vector(col), Schema(List(new Column("_0", dataType))))
}
def fromSeq[T: ru.TypeTag](data: Seq[T], columnNames: String*): DataFrame = {

val dataType = Types.toInternalType(ru.typeOf[T])
val df = dataType match {
case ProductType(elementTypes@_*) =>
val tupleData = data.asInstanceOf[Seq[Product]]
val columnData = elementTypes.zipWithIndex.map({ case (t, i) => convert(tupleData.map(_.productElement(i)), t) })
val columnSchemas = elementTypes.zipWithIndex.map({ case (t, i) => Column(s"_$i", t) })
new DataFrame(columnData.toVector, Schema(columnSchemas.toList))
case _ =>
val col = convert(data, dataType)
new DataFrame(Vector(col), Schema(List(new Column("_0", dataType))))
}
if (columnNames.nonEmpty) df.renameColumns(columnNames:_*) else df
}


/**
* Construct a new DataFrame from a list of columns and a schema.
*
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/com/audienceproject/crossbow/Implicits.scala
Expand Up @@ -53,5 +53,10 @@ object Implicits {
Reducer[T, U](spec, f, seed, Types.toInternalType(ru.typeOf[U]))
}
}
implicit class SeqWrapper[T: ru.TypeTag](val seq:Seq[T]) {

def toDataFrame(columnNames:String*): DataFrame = {
DataFrame.fromSeq(seq,columnNames:_*)
}
}
}
@@ -0,0 +1,26 @@
package com.audienceproject.crossbow.core

import com.audienceproject.crossbow.{DataFrame, expr}
import com.audienceproject.crossbow.schema.{Column, Schema}
import org.scalatest.funsuite.AnyFunSuite

class ConstuctionTest extends AnyFunSuite {
test("construct from typed empty seq") {
val df = DataFrame.fromSeq(Seq.empty[(Int,Long)])
assertResult(2)(df.numColumns)
assertResult(Schema(Seq(Column("_0",expr.IntType),Column("_1",expr.LongType))))(df.schema)
}
test("construct using implicits") {
import com.audienceproject.crossbow.Implicits._
val df = Seq.empty[(Int,Long)].toDataFrame()
assertResult(2)(df.numColumns)
assertResult(Schema(Seq(Column("_0",expr.IntType),Column("_1",expr.LongType))))(df.schema)
}

test("construct with names") {
import com.audienceproject.crossbow.Implicits._
val df = Seq.empty[(Int,Long)].toDataFrame("a","b")
assertResult(2)(df.numColumns)
assertResult(Schema(Seq(Column("a",expr.IntType),Column("b",expr.LongType))))(df.schema)
}
}

0 comments on commit b62bcf7

Please sign in to comment.