Skip to content

Commit

Permalink
Backwards compatibility and cross-compilation to Scala 2.11 and 2.12
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Fischer committed Aug 25, 2020
1 parent babc442 commit 8d7d842
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 21 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Expand Up @@ -2,9 +2,10 @@ organization := "com.audienceproject"

name := "crossbow"

version := "0.1.0"
version := "0.1.1"

scalaVersion := "2.13.3"
crossScalaVersions := Seq("2.12.12", "2.11.12")

scalacOptions += "-deprecation"

Expand Down
18 changes: 9 additions & 9 deletions src/main/scala/com/audienceproject/crossbow/DataFrame.scala
Expand Up @@ -5,7 +5,6 @@ import com.audienceproject.crossbow.exceptions.IncorrectTypeException
import com.audienceproject.crossbow.expr._
import com.audienceproject.crossbow.schema.{Column, Schema}

import scala.collection.immutable.ArraySeq
import scala.reflect.ClassTag
import scala.util.Sorting

Expand Down Expand Up @@ -86,10 +85,9 @@ class DataFrame private(private val columnData: Vector[Array[_]],
* @return new DataFrame
*/
def removeColumns(columnNames: String*): DataFrame = {
val remaining = for ((c, i) <- schema.columns.zipWithIndex if !columnNames.contains(c.name))
yield (columnData(i), c)
val (colData, colSchemas) = remaining.unzip
new DataFrame(colData.toVector, Schema(colSchemas))
val remainingIndices = for ((c, i) <- schema.columns.zipWithIndex if !columnNames.contains(c.name)) yield i
val colData = remainingIndices.map(columnData)
new DataFrame(colData.toVector, Schema(remainingIndices.map(schema.columns)))
}

/**
Expand All @@ -111,7 +109,7 @@ class DataFrame private(private val columnData: Vector[Array[_]],
case _ => new Column(s"_$i", eval.typeOf)
}
(sliceColumn(eval), newColSchema)
}).unzip
}).unzip[Array[_], Column](p => (p._1, p._2))
new DataFrame(colData.toVector, Schema(colSchemas))
}

Expand Down Expand Up @@ -149,8 +147,10 @@ class DataFrame private(private val columnData: Vector[Array[_]],
val eval = expr.compile(this)
val ord = Order.getOrdering(eval.typeOf, givenOrderings)
val indices = Array.tabulate(rowCount)(identity)
Sorting.quickSort[Int](indices)((x: Int, y: Int) => ord.compare(eval(x), eval(y)))
slice(ArraySeq.unsafeWrapArray(indices))
Sorting.quickSort[Int](indices)(new Ordering[Int] {
override def compare(x: Int, y: Int): Int = ord.compare(eval(x), eval(y))
})
slice(indices.toIndexedSeq)
}

/**
Expand Down Expand Up @@ -296,7 +296,7 @@ class DataFrame private(private val columnData: Vector[Array[_]],

override def iterator: Iterator[T] = this (0 until rowCount).iterator

override def knownSize: Int = rowCount
override def size: Int = rowCount
}

override def isEmpty: Boolean = rowCount == 0
Expand Down
Expand Up @@ -4,6 +4,7 @@ import com.audienceproject.crossbow.exceptions.JoinException
import com.audienceproject.crossbow.expr.{Expr, Order}
import com.audienceproject.crossbow.{DataFrame, JoinType}

import scala.annotation.tailrec
import scala.collection.mutable

private[crossbow] object SortMergeJoin {
Expand All @@ -28,13 +29,17 @@ private[crossbow] object SortMergeJoin {
if (start >= keyColumn.size) Seq.empty
else {
val key = keyColumn(start)
Seq.unfold(start)(i => {

@tailrec
def indexOfNextKey(i: Int): Int = {
if (i < keyColumn.size) {
val nextKey = keyColumn(i)
if (ordering.equiv(key, nextKey)) Some(i, i + 1)
else None
} else None
})
if (ordering.equiv(key, nextKey)) indexOfNextKey(i + 1)
else i
} else i
}

start until indexOfNextKey(start + 1)
}
}

Expand Down
Expand Up @@ -130,8 +130,8 @@ private[crossbow] object ArithmeticOps {

case class Negate(expr: Expr) extends UnaryExpr(expr) {
override def typeSpec(operand: Specialized[_]): Specialized[_] = operand.typeOf match {
case LongType => specialize[Long, Long](operand, math.negateExact)
case IntType => specialize[Int, Int](operand, math.negateExact)
case LongType => specialize[Long, Long](operand, -_)
case IntType => specialize[Int, Int](operand, -_)
case DoubleType => specialize[Double, Double](operand, -_)
case _ => throw new InvalidExpressionException("Negate", operand.typeOf)
}
Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/com/audienceproject/crossbow/expr/Order.scala
Expand Up @@ -15,7 +15,7 @@ object Order {
val implicitOrdering = internalType match {
case IntType => Ordering.Int
case LongType => Ordering.Long
case DoubleType => Ordering.Double.TotalOrdering
case DoubleType => DoubleOrdering
case BooleanType => Ordering.Boolean
case ProductType(elementTypes@_*) =>
val tupleTypes = elementTypes.map(getOrdering(_, givens))
Expand All @@ -35,4 +35,8 @@ object Order {
}
}

private object DoubleOrdering extends Ordering[Double] {
override def compare(x: Double, y: Double): Int = java.lang.Double.compare(x, y)
}

}
Expand Up @@ -36,7 +36,10 @@ class SortByTest extends AnyFunSuite {

assertThrows[NoOrderingException](customDf.sortBy($"custom"))

val result = customDf.sortBy($"custom", Order.by[Custom]((c1, c2) => c1.x - c2.x)).select($"k").as[String].toSeq
val customOrdering = new Ordering[Custom] {
override def compare(c1: Custom, c2: Custom): Int = c1.x - c2.x
}
val result = customDf.sortBy($"custom", Order.by(customOrdering)).select($"k").as[String].toSeq
val expected = Seq("a", "b", "c", "d")
assert(result == expected)
}
Expand Down
4 changes: 2 additions & 2 deletions wercker.yml
Expand Up @@ -8,10 +8,10 @@ build:
steps:
- script:
name: Compile
code: sbt clean compile
code: sbt clean +compile
- script:
name: Test
code: sbt clean compile test
code: sbt +test
- script:
name: Clean again
code: sbt clean
Expand Down

0 comments on commit 8d7d842

Please sign in to comment.