Skip to content

Commit

Permalink
fix: fix schema issue with databricks e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mhamilton723 committed Aug 20, 2019
1 parent 258cafb commit d213b29
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 58 deletions.
Expand Up @@ -46,7 +46,7 @@ object DatabricksUtilities {
val authValue: String = "Basic " + BaseEncoding.base64()
.encode(("token:" + token).getBytes("UTF-8"))
val baseURL = s"https://$region.azuredatabricks.net/api/2.0/"
val clusterName = "Test Cluster"
val clusterName = "Test Cluster 2"
lazy val clusterId: String = getClusterIdByName(clusterName)

val folder = s"/MMLSparkBuild/build_${BuildInfo.version}"
Expand Down
75 changes: 43 additions & 32 deletions src/main/scala/com/microsoft/ml/spark/cntk/CNTKModel.scala
Expand Up @@ -14,7 +14,7 @@ import com.microsoft.ml.spark.stages.{FixedMiniBatchTransformer, FlattenBatch, H
import org.apache.spark.SparkContext
import org.apache.spark.broadcast._
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Model}
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.ml.linalg.{SQLDataTypes, Vectors, Vector => SVector}
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
Expand Down Expand Up @@ -53,13 +53,13 @@ private object CNTKModelUtils extends java.io.Serializable {
ov.getDataType match {
case CNTKDataType.Float =>
val fvv = new FloatVectorVector() //TODO try re-using
val value = outputDataMap.getitem(ov)
val value = outputDataMap.getitem(ov)
value.copyVariableValueToFloat(ov, fvv)
value.delete()
Left(fvv)
case CNTKDataType.Double =>
val dvv = new DoubleVectorVector() //TODO try re-using
val value = outputDataMap.getitem(ov)
val value = outputDataMap.getitem(ov)
value.copyVariableValueToDouble(ov, dvv)
value.delete()
Right(dvv)
Expand All @@ -85,9 +85,7 @@ private object CNTKModelUtils extends java.io.Serializable {

def applyModel(inputMap: Map[String, Int],
broadcastedModel: Broadcast[CNTKFunction],
outputMap: Map[String, String],
convertToDenseVector: Boolean
)
outputMap: Map[String, String])
(inputRows: Iterator[Row]): Iterator[Row] = {

if (!inputRows.hasNext) {
Expand All @@ -102,7 +100,7 @@ private object CNTKModelUtils extends java.io.Serializable {
val inputExtractors = makeInputExtractors(inputMapVar)

val inputGVVs = inputMapVar.map {
case (colnum, variable) => variable -> {
case (_, variable) => variable -> {
variable.getDataType match {
case CNTKDataType.Float =>
Left(new FloatVectorVector())
Expand All @@ -114,20 +112,15 @@ private object CNTKModelUtils extends java.io.Serializable {

// WARNING: DO NOT simplify this to mapValues,
// for some reason it calls the inner function more than it should
val preprocessFunction: (Row) => Map[Variable, GVV] = {
{ row: Row => inputExtractors.map { case (k,f) =>
k -> ConversionUtils.toGVV(f(row), inputGVVs(k)) }}
val preprocessFunction: Row => Map[Variable, GVV] = {
{ row: Row =>
inputExtractors.map { case (k, f) =>
k -> ConversionUtils.toGVV(f(row), inputGVVs(k))
}
}
}

val outputVars = outputMapVar.keys.toList
val converter = if (convertToDenseVector) {
{ gvv: GVV => ConversionUtils.toDV(gvv) }
} else {
{ gvv: GVV => ConversionUtils.toSSG(gvv) match {
case Left(ssf) => ssf
case Right(ssd) => ssd
}}
}

val outputVarVector = new VariableVector()
outputVars.foreach(outputVarVector.add)
Expand All @@ -136,7 +129,7 @@ private object CNTKModelUtils extends java.io.Serializable {
inputRows.map { row =>
val feedDict = preprocessFunction(row)
val outputGVVs = applyCNTKFunction(of, feedDict, outputVars, device)
val resultRow = Row(outputGVVs.map(converter): _*)
val resultRow = Row(outputGVVs.map(ConversionUtils.convertGVV): _*)
val outputRow = Row.merge(row, resultRow)
outputGVVs.foreach(ConversionUtils.deleteGVV)
outputRow
Expand Down Expand Up @@ -429,9 +422,9 @@ class CNTKModel(override val uid: String) extends Model[CNTKModel] with ComplexP

val funcOpt = (colType, targetElementType) match {
case (VectorType, DoubleType) =>
Some({ av: Seq[DenseVector] => av.map(_.toArray) })
Some({ av: Seq[SVector] => av.map(_.toArray) })
case (VectorType, FloatType) =>
Some({ av: Seq[DenseVector] => av.map(_.toArray.map(_.toFloat)) })
Some({ av: Seq[SVector] => av.map(_.toArray.map(_.toFloat)) })
case (ArrayType(FloatType, _), DoubleType) =>
Some({ av: Seq[Seq[Float]] => av.map(_.map(_.toDouble)) })
case (ArrayType(DoubleType, _), FloatType) =>
Expand Down Expand Up @@ -466,6 +459,29 @@ class CNTKModel(override val uid: String) extends Model[CNTKModel] with ComplexP
}
}

private def coerceOutputDF(unbatchedDF: DataFrame): DataFrame = {
val floatToDV = udf({ v: Seq[Float] => Vectors.dense(v.map(_.toDouble).toArray) }, SQLDataTypes.VectorType)
val doubleToDV = udf({ v: Seq[Double] => Vectors.dense(v.toArray) }, SQLDataTypes.VectorType)

if (getConvertOutputToDenseVector) {
val outputSchema = getModel.getOutputSchema(getFetchDict)
val outputColumnNames = outputSchema.map(_.name).toSet
val colsToSelect = unbatchedDF.schema.map {
case sf if outputColumnNames(sf.name) =>
sf match {
case StructField(name, ArrayType(FloatType, _), _, _) =>
floatToDV(col(name)).alias(name)
case StructField(name, ArrayType(DoubleType, _), _, _) =>
doubleToDV(col(name)).alias(name)
}
case sf => col(sf.name)
}
unbatchedDF.select(colsToSelect: _*)
} else {
unbatchedDF
}
}

/** Evaluate the model
*
* @param dataset the dataset to featurize
Expand All @@ -487,24 +503,20 @@ class CNTKModel(override val uid: String) extends Model[CNTKModel] with ComplexP
coerceDFAndFeedDict(batchedDF, getFeedDict)

val columnIndexToVar = coercedFeedDict.map { case (k, v) =>
k -> preprocessedDF.schema.fieldIndex(v) }
k -> preprocessedDF.schema.fieldIndex(v)
}

if (broadcastedModelOption.isEmpty) rebroadcastCNTKModel(spark)

val encoder = RowEncoder(getModel.getOutputSchema(getFetchDict)
.foldLeft(preprocessedDF.schema) { case (st, sf) =>
if (getConvertOutputToDenseVector)
st.add(sf.name, ArrayType(VectorType))
else
st.add(sf.name, ArrayType(sf.dataType))
})
.foldLeft(preprocessedDF.schema) { case (st, sf) => st.add(sf.name, ArrayType(sf.dataType)) }
)

val outputDF = preprocessedDF.mapPartitions { it =>
CNTKModelUtils.applyModel(
columnIndexToVar,
broadcastedModelOption.get,
getFetchDict,
getConvertOutputToDenseVector)(it)
getFetchDict)(it)
}(encoder)

val droppedDF = outputDF.drop(outputDF.columns.filter(_.startsWith(coercionPrefix)): _*)
Expand All @@ -514,8 +526,7 @@ class CNTKModel(override val uid: String) extends Model[CNTKModel] with ComplexP
} else {
droppedDF
}

unbatchedDF
coerceOutputDF(unbatchedDF)
}

}
14 changes: 9 additions & 5 deletions src/main/scala/com/microsoft/ml/spark/cntk/Conversions.scala
Expand Up @@ -4,9 +4,8 @@
package com.microsoft.ml.spark.cntk

import com.microsoft.CNTK.{DoubleVector, DoubleVectorVector, FloatVector, FloatVectorVector}
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.ml.linalg.{Vector=>SVector, Vectors}

import scala.collection.mutable
import scala.language.implicitConversions

object ConversionUtils {
Expand All @@ -15,6 +14,11 @@ object ConversionUtils {

type SSG = Either[Seq[Seq[Float]], Seq[Seq[Double]]]

def convertGVV(gvv: GVV): Seq[Seq[_]] = {
val ssg =toSSG(gvv)
ssg.left.toOption.getOrElse(ssg.right.get)
}

def toSSG(gvv: GVV): SSG = {
gvv match {
case Left(vv) =>
Expand All @@ -41,19 +45,19 @@ object ConversionUtils {
}
}

def toDV(gvv: GVV): Seq[DenseVector] = {
def toDV(gvv: GVV): Seq[SVector] = {
gvv match {
case Left(vv) =>
(0 until vv.size.toInt).map { i =>
val v = vv.get(i)
new DenseVector((0 until v.size.toInt).map { j =>
Vectors.dense((0 until v.size.toInt).map { j =>
v.get(j).toDouble
}.toArray)
}
case Right(vv) =>
(0 until vv.size.toInt).map { i =>
val v = vv.get(i)
new DenseVector((0 until v.size.toInt).map { j =>
Vectors.dense((0 until v.size.toInt).map { j =>
v.get(j)
}.toArray)
}
Expand Down
Expand Up @@ -51,7 +51,7 @@ class MultiNGram(override val uid: String)
val intermediateDF = NamespaceInjections.pipelineModel(models).transform(df)
intermediateDF.map { row =>
val mergedNGrams = intermediateOutputCols
.map(col => row.getAs[mutable.WrappedArray[String]](col))
.map(col => row.getAs[Seq[String]](col))
.reduce(_ ++ _)
Row.merge(row, Row(mergedNGrams))
}(RowEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType))))
Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/com/microsoft/ml/spark/image/UnrollImage.scala
Expand Up @@ -12,7 +12,7 @@ import com.microsoft.ml.spark.core.schema.ImageSchemaUtils
import com.microsoft.ml.spark.io.image.ImageUtils
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.ml.linalg.{DenseVector, Vector}
import org.apache.spark.ml.linalg.{Vectors, Vector => SVector}
import org.apache.spark.ml.param.{IntParam, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.functions.udf
Expand All @@ -25,7 +25,7 @@ object UnrollImage extends DefaultParamsReadable[UnrollImage] {

import org.apache.spark.ml.image.ImageSchema._

private[ml] def unroll(row: Row): DenseVector = {
private[ml] def unroll(row: Row): SVector = {
val width = getWidth(row)
val height = getHeight(row)
val bytes = getData(row)
Expand All @@ -49,10 +49,10 @@ object UnrollImage extends DefaultParamsReadable[UnrollImage] {
}
}
}
new DenseVector(rearranged)
Vectors.dense(rearranged)
}

private[ml] def roll(values: Vector, originalImage: Row): Row = {
private[ml] def roll(values: SVector, originalImage: Row): Row = {
roll(
values.toArray.map(d => math.max(0, math.min(255, round(d))).toInt),
originalImage.getString(0),
Expand Down Expand Up @@ -85,7 +85,7 @@ object UnrollImage extends DefaultParamsReadable[UnrollImage] {
Row(path, height, width, nChannels, mode, rearranged.map(_.toByte))
}

private[ml] def unrollBI(image: BufferedImage): DenseVector = {
private[ml] def unrollBI(image: BufferedImage): SVector = {
val nChannels = image.getColorModel.getNumComponents
val isGray = image.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAY
val hasAlpha = image.getColorModel.hasAlpha
Expand Down Expand Up @@ -120,13 +120,13 @@ object UnrollImage extends DefaultParamsReadable[UnrollImage] {
}
}

new DenseVector(unrolled)
Vectors.dense(unrolled)
}

private[ml] def unrollBytes(bytes: Array[Byte],
width: Option[Int],
height: Option[Int],
nChannels: Option[Int]): Option[DenseVector] = {
nChannels: Option[Int]): Option[SVector] = {
val biOpt = ImageUtils.safeRead(bytes)
biOpt.map { bi =>
(height, width) match {
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/microsoft/ml/spark/lime/Superpixel.scala
Expand Up @@ -29,7 +29,7 @@ object SuperpixelData {
val Schema: DataType = ScalaReflection.schemaFor[SuperpixelData].dataType

def fromRow(r: Row): SuperpixelData = {
val clusters = r.getAs[mutable.WrappedArray[mutable.WrappedArray[Row]]](0)
val clusters = r.getAs[Seq[Seq[Row]]](0)
SuperpixelData(clusters.map(cluster => cluster.map(r => (r.getInt(0), r.getInt(1)))))
}

Expand Down Expand Up @@ -64,14 +64,14 @@ object Superpixel {
}
}

def maskImageHelper(img: Row, sp: Row, states: mutable.WrappedArray[Boolean]): Row = {
def maskImageHelper(img: Row, sp: Row, states: Seq[Boolean]): Row = {
val bi = maskImage(img, SuperpixelData.fromRow(sp), states.toArray)
ImageUtils.toSparkImage(bi).getStruct(0)
}

val MaskImageUDF: UserDefinedFunction = udf(maskImageHelper _, ImageSchema.columnSchema)

def maskBinaryHelper(img: Array[Byte], sp: Row, states: mutable.WrappedArray[Boolean]): Row = {
def maskBinaryHelper(img: Array[Byte], sp: Row, states: Seq[Boolean]): Row = {
val biOpt = maskBinary(img, SuperpixelData.fromRow(sp), states.toArray)
biOpt.map(ImageUtils.toSparkImage(_).getStruct(0)).orNull
}
Expand Down
Expand Up @@ -172,13 +172,13 @@ class RankingTrainValidationSplit(override val uid: String) extends Estimator[Ra
val wrapColumn = udf((itemId: Double, rating: Double) => Array(itemId, rating))

val sliceudf = udf(
(r: mutable.WrappedArray[Array[Double]]) => r.slice(0, math.round(r.length * $(trainRatio)).toInt))
(r: Seq[Array[Double]]) => r.slice(0, math.round(r.length * $(trainRatio)).toInt))

val shuffle = udf((r: mutable.WrappedArray[Array[Double]]) =>
val shuffle = udf((r: Seq[Array[Double]]) =>
if (shuffleBC.value) Random.shuffle(r.toSeq)
else r
)
val dropudf = udf((r: mutable.WrappedArray[Array[Double]]) => r.drop(math.round(r.length * $(trainRatio)).toInt))
val dropudf = udf((r: Seq[Array[Double]]) => r.drop(math.round(r.length * $(trainRatio)).toInt))

val testds = dataset
.withColumn("itemIDRating", wrapColumn(col(getItemCol), col(getRatingCol)))
Expand Down Expand Up @@ -209,13 +209,13 @@ class RankingTrainValidationSplit(override val uid: String) extends Estimator[Ra
Array(train, test)
}
else {
val shuffle = udf((r: mutable.WrappedArray[Double]) =>
val shuffle = udf((r: Seq[Double]) =>
if (shuffleBC.value) Random.shuffle(r.toSeq)
else r
)
val sliceudf = udf(
(r: mutable.WrappedArray[Double]) => r.slice(0, math.round(r.length * $(trainRatio)).toInt))
val dropudf = udf((r: mutable.WrappedArray[Double]) => r.drop(math.round(r.length * $(trainRatio)).toInt))
(r: Seq[Double]) => r.slice(0, math.round(r.length * $(trainRatio)).toInt))
val dropudf = udf((r: Seq[Double]) => r.drop(math.round(r.length * $(trainRatio)).toInt))

val testds = dataset
.groupBy(col(getUserCol))
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/microsoft/ml/spark/stages/udfs.scala
Expand Up @@ -22,7 +22,7 @@ object udfs {
}

val to_vector: UserDefinedFunction = udf({
arr: mutable.WrappedArray[Double] => Vectors.dense(arr.toArray)
arr: Seq[Double] => Vectors.dense(arr.toArray)
}, VectorType)

def to_vector(colName: String): Column = to_vector(col(colName))
Expand Down
Expand Up @@ -13,7 +13,7 @@ class NGramSpec extends TestBase {

def ngramDFToScalaList(dataFrame: DataFrame, outputCol: String = "ngrams"): Array[List[Any]] = {
dataFrame.select(dataFrame(outputCol)).collect()
.map(_.getAs[mutable.WrappedArray[Any]](0).toList)
.map(_.getAs[Seq[Any]](0).toList)
}

test("operation on tokenized strings") {
Expand Down
Expand Up @@ -30,7 +30,7 @@ class MultiNGramSpec extends TransformerFuzzing[MultiNGram] {
lazy val dfNgram = t.transform(dfTok)

test("operate on tokens ") {
val grams = dfNgram.collect().last.getAs[mutable.WrappedArray[String]]("ngrams").toSet
val grams = dfNgram.collect().last.getAs[Seq[String]]("ngrams").toSet
assert(grams("1 2 3 4"))
assert(grams("4"))
assert(grams("2 3 4"))
Expand Down

0 comments on commit d213b29

Please sign in to comment.