Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Interaction and CountVectorizer #719

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package ml.combust.mleap.core.feature
import ml.combust.mleap.core.Model
import ml.combust.mleap.core.annotation.SparkCode
import ml.combust.mleap.core.types.{BasicType, ListType, StructType, TensorType}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import ml.combust.mleap.tensor.{SparseTensor, Tensor}

import scala.collection.mutable
import scala.collection.{SortedMap, mutable}

/**
* Created by hollinwilkins on 12/28/16.
Expand All @@ -15,9 +15,18 @@ case class CountVectorizerModel(vocabulary: Array[String],
binary: Boolean,
minTf: Double) extends Model {
val dict: Map[String, Int] = vocabulary.zipWithIndex.toMap

def apply(document: Seq[String]): Vector = {
val termCounts = mutable.Map[Int, Double]()
val outputSize = dict.size
// The Seq created below are required by SparseTensor api during initialization
// For performance optimization, we initialize these sequences here so we don't have to at runtime
val seqCache: Array[Seq[Int]] = {
val arr = mutable.ArrayBuilder.make[Seq[Int]]
for (i <- 0 to outputSize){
arr += Seq(i)
}
arr.result()
}
def _apply(document: Seq[String]): (Seq[Seq[Int]], Array[Double]) = {
var termCounts = SortedMap[Int, Double]()
var tokenCount = 0L
document.foreach {
term =>
Expand All @@ -27,15 +36,34 @@ case class CountVectorizerModel(vocabulary: Array[String],
}
tokenCount += 1
}

val valuesArray = mutable.ArrayBuilder.make[Double]
val indicesArray = mutable.ArrayBuilder.make[Seq[Int]]
val effectiveMinTF = if (minTf >= 1.0) minTf else tokenCount * minTf
val effectiveCounts = if(binary) {
termCounts.filter(_._2 >= effectiveMinTF).map(p => (p._1, 1.0)).toSeq
} else {
termCounts.filter(_._2 >= effectiveMinTF).toSeq
val iterator = termCounts.iterator
if (binary){
while (iterator.hasNext){
val (termIndex, count) = iterator.next()
if (count>= effectiveMinTF){
valuesArray += 1.0
indicesArray += seqCache(termIndex)
}
}
}else{
while (iterator.hasNext){
val (termIndex, count) = iterator.next()
if (count>= effectiveMinTF){
valuesArray += count
indicesArray += seqCache(termIndex)
}
}
Comment on lines +43 to +58
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while loops are faster than filter + map. This also eliminates the need to create a Seq of tuples and then unzip or iterate over it to get indices and values

}
Tuple2(indicesArray.result(), valuesArray.result())
}


Vectors.sparse(dict.size, effectiveCounts)
def apply(document: Seq[String]): Tensor[Double] = {
val (indices, values) = _apply(document)
SparseTensor(indices, values, Seq(outputSize))
}

override def inputSchema: StructType = StructType("input" -> ListType(BasicType.String)).get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package ml.combust.mleap.core.feature
import ml.combust.mleap.core.Model
import ml.combust.mleap.core.annotation.SparkCode
import ml.combust.mleap.core.types._
import ml.combust.mleap.tensor.Tensor
import ml.combust.mleap.core.util.VectorConverters._
import ml.combust.mleap.tensor.{DenseTensor, SparseTensor, Tensor}
import org.apache.spark.ml.linalg.{Vector, Vectors}

import scala.collection.mutable
Expand All @@ -18,9 +17,20 @@ case class InteractionModel(featuresSpec: Array[Array[Int]],
assert(inputShapes.find(s => !s.isScalar && !s.isTensor) == None, "must provide scalar and tensor shapes as inputs")

val outputSize = featuresSpec.map(_.sum).product
val seqCache: Array[Seq[Int]] = {
val arr = mutable.ArrayBuilder.make[Seq[Int]]
for (i <- 0 to outputSize){
arr += Seq(i)
}
arr.result()
}
val encoders: Array[FeatureEncoder] = featuresSpec.map(FeatureEncoder.apply)
def apply(features: Seq[Any]): Tensor[Double] = {
val (size, indices, values) = _apply(features)
SparseTensor(indices.map(e=>seqCache(e)), values, Seq(size))
}

def apply(features: Seq[Any]): Vector = {
def _apply(features: Seq[Any]): (Int, Array[Int], Array[Double]) = {
var indices = mutable.ArrayBuilder.make[Int]
var values = mutable.ArrayBuilder.make[Double]
var size = 1
Expand All @@ -45,7 +55,7 @@ case class InteractionModel(featuresSpec: Array[Array[Int]],
})
featureIndex -= 1
}
Vectors.sparse(size, indices.result(), values.result()).compressed
Tuple3(size, indices.result(), values.result())
}

override def inputSchema: StructType = {
Expand Down Expand Up @@ -78,19 +88,26 @@ case class FeatureEncoder(numFeatures: Array[Int]) {
arr
}

private def checkAndApplyWithOffset(
numOutputCols: Int, dV: Double, idx: Int, f: (Int, Double) => Unit
): Unit = {
if (numOutputCols > 1) {
assert(
dV >= 0.0 && dV == dV.toInt && dV < numOutputCols,
s"Values from column must be indices, but got $dV.")
f(outputOffsets(idx) + dV.toInt, 1.0)
} else {
f(outputOffsets(idx), dV)
}
}
/**
* Given an input row of features, invokes the specific function for every non-zero output.
*
* @param v The row value to encode, either a Double or Vector.
* @param f The callback to invoke on each non-zero (index, value) output pair.
*/
def foreachNonzeroOutput(v: Any, f: (Int, Double) => Unit): Unit = {
val value = v match {
case tensor: Tensor[_] => tensor.asInstanceOf[Tensor[Double]]: Vector
case _ => v
}

value match {
v match {
case d: Double =>
assert(numFeatures.length == 1, "DoubleType columns should only contain one feature.")
val numOutputCols = numFeatures.head
Expand All @@ -107,14 +124,33 @@ case class FeatureEncoder(numFeatures: Array[Int]) {
s"Vector column size was ${vec.size}, expected ${numFeatures.length}")
vec.foreachActive { (i, v) =>
val numOutputCols = numFeatures(i)
if (numOutputCols > 1) {
assert(
v >= 0.0 && v == v.toInt && v < numOutputCols,
s"Values from column must be indices, but got $v.")
f(outputOffsets(i) + v.toInt, 1.0)
} else {
f(outputOffsets(i), v)
}
checkAndApplyWithOffset(numOutputCols, v, i, f)
}
case dT: DenseTensor[_] =>
val values = dT.values
val dimensions = dT.dimensions
assert(numFeatures.length == dimensions.product,
s"Vector column size was ${dimensions.product}, expected ${numFeatures.length}")
val iterator = values.iterator
var idx = 0
while (iterator.hasNext){
val dV = iterator.next().asInstanceOf[Number].doubleValue()
checkAndApplyWithOffset(numFeatures(idx), dV, idx, f)
idx += 1
}
case st: SparseTensor[_] =>
val indices = st.indices
val values = st.values
val dimensions = st.dimensions
assert(numFeatures.length == dimensions.product,
s"Vector column size was ${dimensions.head}, expected ${numFeatures.length}")
var idx = 0
val iterator = indices.iterator
while (iterator.hasNext){
val numOutputCols = numFeatures(iterator.next().head)
val dV = values(idx).asInstanceOf[Number].doubleValue()
checkAndApplyWithOffset(numOutputCols, dV, idx, f)
idx += 1
}
case null =>
throw new IllegalArgumentException("Values to interact cannot be null.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@ class CountVectorizerModelSpec extends FunSpec {
assert(model.outputSchema.fields ==
Seq(StructField("output", TensorType.Double(3))))
}

it("Produces the correct mleap Tensor"){
assert(model(Seq("1", "1", "2", "3")).toArray.toSeq == Seq(1, 0, 0))
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ml.combust.mleap.core.feature

import ml.combust.mleap.core.types._
import ml.combust.mleap.tensor.{DenseTensor, SparseTensor}
import org.apache.spark.ml.linalg.Vectors
import org.scalatest.FunSpec

Expand All @@ -13,11 +14,21 @@ class InteractionModelSpec extends FunSpec {
val encoderSpec: Array[Array[Int]] = Array(Array(1), Array(1, 1))
val model = InteractionModel(encoderSpec, Seq(ScalarShape(), TensorShape(2)))

it("produces the expected interaction vector") {
it("produces the expected interaction vector when using spark Vector") {
val features = Seq(2.toDouble, Vectors.dense(3, 4))
assert(model(features).toArray.toSeq == Seq(6, 8))
}

it("produces the expected interaction vector when using DenseTensor") {
val features = Seq(2.toDouble, DenseTensor(Seq(3, 4).toArray, Seq(2)))
assert(model(features).toArray.toSeq == Seq(6, 8))
}

it("produces the expected interaction vector when using SparseTensor") {
val features = Seq(2.toDouble, SparseTensor(Seq(Seq(0), Seq(1)), Seq(3, 4).toArray, Seq(2)))
assert(model(features).toArray.toSeq == Seq(6, 8))
}

it("has the right inputs") {
assert(model.inputSchema.fields == Seq(StructField("input0", ScalarType.Double),
StructField("input1", TensorType.Double(2))))
Expand All @@ -32,12 +43,34 @@ class InteractionModelSpec extends FunSpec {
val encoderSpec: Array[Array[Int]] = Array(Array(4), Array(1, 1))
val model = InteractionModel(encoderSpec, Seq(ScalarShape(), TensorShape(2)))

it("produce the expected interaction vector") {
it("produce the expected interaction vector when using spark Vector") {
val features = Seq(2.toDouble, Vectors.dense(3, 4))

assert(model(features).toArray.toSeq == Seq(0, 0, 0, 0, 3, 4, 0, 0))
}

describe("produce the expected interaction vector when using Dense Tensor") {
it("when the first feature is 2 and the second is (3,4)"){
val features = Seq(2.toDouble, DenseTensor(Seq(3, 4).toArray, Seq(2)))
assert(model(features).toArray.toSeq == Seq(0, 0, 0, 0, 3, 4, 0, 0))
}
it("when the first feature is 3 and the second is (3,4)"){
val features = Seq(3.toDouble, DenseTensor(Seq(3, 4).toArray, Seq(2)))
assert(model(features).toArray.toSeq == Seq(0, 0, 0, 0, 0, 0, 3, 4))
}
}

describe("produce the expected interaction vector when using Sparse Tensor") {
it("when the first feature is 2 and the second is (3, 4)"){
val features = Seq(2.toDouble, SparseTensor(Seq(Seq(0), Seq(1)), Seq(3, 4).toArray, Seq(2)))
assert(model(features).toArray.toSeq == Seq(0, 0, 0, 0, 3, 4, 0, 0))
}
it("when the first feature is 3 and the second is (3, 4)"){
val features = Seq(3.toDouble, SparseTensor(Seq(Seq(0), Seq(1)), Seq(3, 4).toArray, Seq(2)))
assert(model(features).toArray.toSeq == Seq(0, 0, 0, 0, 0, 0, 3, 4))
}
}

it("has the right inputs") {
assert(model.inputSchema.fields == Seq(StructField("input0", ScalarType.Double),
StructField("input1", TensorType.Double(2))))
Expand All @@ -46,5 +79,6 @@ class InteractionModelSpec extends FunSpec {
it("has the right outputs") {
assert(model.outputSchema.fields == Seq(StructField("output", TensorType.Double(8))))
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package ml.combust.mleap.runtime.transformer.feature
import ml.combust.mleap.core.feature.CountVectorizerModel
import ml.combust.mleap.core.types._
import ml.combust.mleap.runtime.function.UserDefinedFunction
import ml.combust.mleap.tensor.Tensor
import ml.combust.mleap.core.util.VectorConverters._
import ml.combust.mleap.runtime.frame.{SimpleTransformer, Transformer}

/**
Expand All @@ -13,5 +11,5 @@ import ml.combust.mleap.runtime.frame.{SimpleTransformer, Transformer}
case class CountVectorizer(override val uid: String = Transformer.uniqueName("count_vectorizer"),
override val shape: NodeShape,
override val model: CountVectorizerModel) extends SimpleTransformer {
override val exec: UserDefinedFunction = (document: Seq[String]) => model(document): Tensor[Double]
override val exec: UserDefinedFunction = (document: Seq[String]) => model(document)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package ml.combust.mleap.runtime.transformer.feature
import ml.combust.mleap.core.feature.InteractionModel
import ml.combust.mleap.core.types._
import ml.combust.mleap.runtime.function.UserDefinedFunction
import ml.combust.mleap.tensor.Tensor
import ml.combust.mleap.core.util.VectorConverters._
import ml.combust.mleap.runtime.frame.{BaseTransformer, FrameBuilder, Row, Transformer}

import scala.util.Try
Expand All @@ -16,7 +14,8 @@ case class Interaction(override val uid: String = Transformer.uniqueName("intera
override val shape: NodeShape,
override val model: InteractionModel) extends BaseTransformer {
val exec: UserDefinedFunction = {
UserDefinedFunction((row: Row) => model(row.toSeq): Tensor[Double],
UserDefinedFunction(
(row: Row) => model(row.toSeq),
TensorType(BasicType.Double, Seq(model.outputSize)),
Seq(SchemaSpec(inputSchema)))
}
Expand Down