Skip to content

Commit

Permalink
[SWPRIVATE-16] NA handling for Spark algorithms
Browse files Browse the repository at this point in the history
  • Loading branch information
mdymczyk committed Sep 7, 2016
1 parent 5cdd792 commit 2f252ba
Show file tree
Hide file tree
Showing 14 changed files with 412 additions and 152 deletions.
25 changes: 1 addition & 24 deletions core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
description = "Sparkling Water Core"

apply from: "$rootDir/gradle/utils.gradle"
apply from: "$rootDir/gradle/sparkTest.gradle"

dependencies {
// Required for h2o-app (we need UI)
Expand Down Expand Up @@ -54,30 +55,6 @@ dependencies {
integTestRuntime fileTree(dir: new File((String) sparkHome, "lib/"), include: '*.jar' )
}

// Setup test environment for Spark
test {
// Test environment
systemProperty "spark.testing", "true"
systemProperty "spark.ext.h2o.node.log.dir", new File(project.getBuildDir(), "h2ologs-test/nodes")
systemProperty "spark.ext.h2o.client.log.dir", new File(project.getBuildDir(), "h2ologs-test/client")

// Run with assertions ON
enableAssertions = true

// For a new JVM for each test class
forkEvery = 1

// Increase heap size
maxHeapSize = "4g"

// Increase PermGen
jvmArgs '-XX:MaxPermSize=384m'

// Working dir will be root project
workingDir = rootDir
// testLogging.showStandardStreams = true
}

task createSparkVersionFile << {
File version_file = file("src/main/resources/spark.version")
// Create parent directories if not created yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,21 +644,6 @@ class H2OFrameToDataFrameTestSuite extends FunSuite with SharedSparkTestContext
h2oFrameEnum.delete()
}

def makeH2OFrame[T: ClassTag](fname: String, colNames: Array[String], chunkLayout: Array[Long],
data: Array[Array[T]], h2oType: Byte, colDomains: Array[Array[String]] = null): H2OFrame = {
var f: Frame = new Frame(Key.make(fname))
FrameUtils.preparePartialFrame(f,colNames)
f.update()

for( i <- chunkLayout.indices) { buildChunks(fname, data(i), i, Array(h2oType)) }

f = DKV.get(fname).get()

FrameUtils.finalizePartialFrame(f, chunkLayout, colDomains, Array(h2oType))

new H2OFrame(f)
}

def fp(it:Iterator[Row]):Unit = {
println(it.size)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,21 +496,6 @@ println("--------1-------")
h2oFrameEnum.delete()
}

def makeH2OFrame[T: ClassTag](fname: String, colNames: Array[String], chunkLayout: Array[Long],
data: Array[Array[T]], h2oType: Byte, colDomains: Array[Array[String]] = null): H2OFrame = {
var f: Frame = new Frame(Key.make(fname))
FrameUtils.preparePartialFrame(f,colNames)
f.update()

for( i <- chunkLayout.indices) { buildChunks(fname, data(i), i, Array(h2oType)) }

f = DKV.get(fname).get()

FrameUtils.finalizePartialFrame(f, chunkLayout, colDomains, Array(h2oType))

new H2OFrame(f)
}

def fp(it:Iterator[Row]):Unit = {
println(it.size)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package org.apache.spark.h2o.utils
import java.util.UUID

import org.apache.spark.SparkContext
import org.apache.spark.h2o.{H2OConf, H2OContext, Holder}
import org.apache.spark.h2o._
import org.apache.spark.sql.SQLContext
import org.scalatest.Suite
import water.fvec.{Chunk, FrameUtils, NewChunk, Vec}
import water.fvec._
import water.{DKV, Key}
import water.parser.BufferedString

import scala.reflect.ClassTag
Expand All @@ -31,9 +32,10 @@ import scala.reflect.ClassTag
* Helper trait to simplify initialization and termination of Spark/H2O contexts.
*
*/
trait SharedSparkTestContext extends SparkTestContext { self: Suite =>
trait SharedSparkTestContext extends SparkTestContext {
self: Suite =>

def createSparkContext:SparkContext
def createSparkContext: SparkContext

def createH2OContext(sc: SparkContext, conf: H2OConf): H2OContext = {
H2OContext.getOrCreate(sc, conf)
Expand All @@ -51,26 +53,51 @@ trait SharedSparkTestContext extends SparkTestContext { self: Suite =>
super.afterAll()
}

def buildChunks[T: ClassTag](fname: String, data: Array[T], cidx: Integer, h2oType: Array[Byte]): Chunk = {
def makeH2OFrame[T: ClassTag](fname: String, colNames: Array[String], chunkLayout: Array[Long],
data: Array[Array[T]], h2oType: Byte, colDomains: Array[Array[String]] = null): H2OFrame = {
makeH2OFrame2(fname, colNames, chunkLayout, data.map(_.map(value => Array(value))), Array(h2oType), colDomains)
}

def makeH2OFrame2[T: ClassTag](fname: String, colNames: Array[String], chunkLayout: Array[Long],
data: Array[Array[Array[T]]], h2oTypes: Array[Byte], colDomains: Array[Array[String]] = null): H2OFrame = {
var f: Frame = new Frame(Key.make(fname))
FrameUtils.preparePartialFrame(f, colNames)
f.update()

for (i <- chunkLayout.indices) {
buildChunks(fname, data(i), i, h2oTypes)
}

f = DKV.get(fname).get()

FrameUtils.finalizePartialFrame(f, chunkLayout, colDomains, h2oTypes)

new H2OFrame(f)
}

def buildChunks[T: ClassTag](fname: String, data: Array[Array[T]], cidx: Integer, h2oType: Array[Byte]): Array[_ <: Chunk] = {
val nchunks: Array[NewChunk] = FrameUtils.createNewChunks(fname, h2oType, cidx)

val chunk: NewChunk = nchunks(0)
data.foreach {
case u: UUID => chunk.addUUID(
u.getLeastSignificantBits,
u.getMostSignificantBits)
case s: String => chunk.addStr(new BufferedString(s))
case b: Byte => chunk.addNum(b)
case s: Short => chunk.addNum(s)
case c: Integer if h2oType(0) == Vec.T_CAT => chunk.addCategorical(c)
case i: Integer if h2oType(0) != Vec.T_CAT => chunk.addNum(i.toDouble)
case l: Long => chunk.addNum(l)
case d: Double => chunk.addNum(d)
case x =>
throw new IllegalArgumentException(s"Failed to figure out what is it: $x")
data.foreach { values =>
values.indices.foreach { idx =>
val chunk: NewChunk = nchunks(idx)
values(idx) match {
case null => chunk.addNA()
case u: UUID => chunk.addUUID(u.getLeastSignificantBits, u.getMostSignificantBits)
case s: String => chunk.addStr(new BufferedString(s))
case b: Byte => chunk.addNum(b)
case s: Short => chunk.addNum(s)
case c: Integer if h2oType(0) == Vec.T_CAT => chunk.addCategorical(c)
case i: Integer if h2oType(0) != Vec.T_CAT => chunk.addNum(i.toDouble)
case l: Long => chunk.addNum(l)
case d: Double => chunk.addNum(d)
case x =>
throw new IllegalArgumentException(s"Failed to figure out what is it: $x")
}
}
}
FrameUtils.closeNewChunks(nchunks)
chunk
nchunks
}
}

Expand Down
23 changes: 23 additions & 0 deletions gradle/sparkTest.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Setup test environment for Spark
test {
// Test environment
systemProperty "spark.testing", "true"
systemProperty "spark.ext.h2o.node.log.dir", new File(project.getBuildDir(), "h2ologs-test/nodes")
systemProperty "spark.ext.h2o.client.log.dir", new File(project.getBuildDir(), "h2ologs-test/client")

// Run with assertions ON
enableAssertions = true

// For a new JVM for each test class
forkEvery = 1

// Increase heap size
maxHeapSize = "4g"

// Increase PermGen
jvmArgs '-XX:MaxPermSize=384m'

// Working dir will be root project
workingDir = rootDir
// testLogging.showStandardStreams = true
}
4 changes: 3 additions & 1 deletion ml/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
apply from: "$rootDir/gradle/sparkTest.gradle"

description = "Sparkling Water ML Pipelines"

dependencies {
Expand All @@ -16,4 +18,4 @@ dependencies {
testCompile "junit:junit:4.11"
testCompile project(':sparkling-water-core').sourceSets.test.output

}
}
9 changes: 8 additions & 1 deletion ml/src/main/scala/hex/schemas/SVMV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package hex.schemas;

import org.apache.spark.ml.spark.models.MissingValuesHandling;
import org.apache.spark.ml.spark.models.svm.*;
import water.DKV;
import water.Key;
Expand Down Expand Up @@ -51,7 +52,8 @@ public static final class SVMParametersV3 extends
"gradient",

"ignored_columns",
"ignore_const_cols"
"ignore_const_cols",
"missing_values_handling"
};

@API(help="Initial model weights.", direction=API.Direction.INOUT, gridable = true)
Expand Down Expand Up @@ -82,6 +84,11 @@ public static final class SVMParametersV3 extends
@API(help="Set the gradient computation type for SGD.", direction=API.Direction.INPUT, values = {"Hinge", "LeastSquares", "Logistic"}, required = true, gridable = true, level = API.Level.expert)
public Gradient gradient = Gradient.Hinge;

@API(level = API.Level.expert, direction = API.Direction.INOUT, gridable = true,
values = {"NotAllowed", "Skip", "MeanImputation"},
help = "Handling of missing values. Either NotAllowed, Skip or MeanImputation.")
public MissingValuesHandling missing_values_handling;

@Override
public SVMParametersV3 fillFromImpl(SVMParameters impl) {
super.fillFromImpl(impl);
Expand Down
132 changes: 132 additions & 0 deletions ml/src/main/scala/org/apache/spark/ml/FrameMLUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml

import org.apache.spark.h2o.H2OContext
import org.apache.spark.ml.spark.models.MissingValuesHandling
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{DataTypes, StructField}
import water.fvec.{Frame, H2OFrame}

object FrameMLUtils {
/**
* Converts a H2O Frame into an RDD[LabeledPoint]. Assumes that the last column is the response column which will be used

* as the label. All other columns will be mapped to a vector and used as features. Categorical columns will be mapped to their

* numerical values.

*

* @param frame Input frame to be converted

* @param _response_column Column which contains the labels

* @param nfeatures Number of features we want to use

* @param missingHandler Missing values strategy

* @param h2oContext Current H2OContext

* @param sqlContext Current SQLContext

* @return Returns an equivalent RDD[LabeledPoint] and means for each column

*/
def toLabeledPoints(frame: Frame,
_response_column: String,
nfeatures: Int,
missingHandler: MissingValuesHandling,
h2oContext: H2OContext,
sqlContext: SQLContext): (RDD[LabeledPoint], Array[Double]) = {
var means: Array[Double] = new Array[Double](nfeatures)
val domains = frame.domains()

val trainingDF = h2oContext.asDataFrame(new H2OFrame(frame))(sqlContext)
val fields: Array[StructField] = trainingDF.schema.fields
var trainingRDD = trainingDF.rdd

if (MissingValuesHandling.Skip.eq(missingHandler)) {
trainingRDD = trainingRDD.filter(!_.anyNull)
} else if (MissingValuesHandling.MeanImputation.eq(missingHandler)) {
// Computing the means by hand and not using frame.means() as it does not compute the mean for enum columns
means = movingAverage(trainingRDD, fields, domains)
}

(trainingRDD.map(row => {
val features = new Array[Double](nfeatures)
(0 until nfeatures).foreach(i => features(i) = if (row.isNullAt(i)) means(i) else toDouble(row.get(i), fields(i), domains(i)))

new LabeledPoint(
toDouble(row.getAs[String](_response_column), fields(fields.length - 1), domains(domains.length - 1)),
Vectors.dense(features)
)
}), means)
}

// Running average so we don't get overflows
private[ml] def movingAverage(trainingRDD: RDD[Row],
fields: Array[StructField],
domains: Array[Array[String]]): Array[Double] = {
val means = new Array[Double](fields.length)
val counts = new Array[Int](means.length)
trainingRDD.aggregate(means.zip(counts))(
// Compute the average within a RDD partition
(agg, row) => {
agg.indices.foreach(i => {
if (!row.isNullAt(i)) {
val value = toDouble(row.get(i), fields(i), domains(i))
val delta = value - agg(i)._1
val n = agg(i)._2 + 1
agg(i) = (agg(i)._1 + delta / n, n)
}
})
agg
},
// Merge all RDD partition stats
(agg1, agg2) => {
merge(agg1, agg2)
}
).map(_._1)
}

def merge(agg1: Array[(Double, Int)], agg2: Array[(Double, Int)]): Array[(Double, Int)] = {
agg1.indices.foreach { idx =>
if (agg1(idx)._2 == 0) {
agg1(idx) = agg2(idx)
} else {
val otherMu: Double = agg2(idx)._1
val mu: Double = agg1(idx)._1
val n = agg1(idx)._2
val otherN = agg2(idx)._2
val delta = otherMu - mu

if (otherN * 10 < n) {
agg1(idx) = (mu + (delta * otherN) / (n + otherN), n + otherN)
} else if (n * 10 < otherN) {
agg1(idx) = (otherMu - (delta * n) / (n + otherN), n + otherN)
} else {
agg1(idx) = ((mu * n + otherMu * otherN) / (n + otherN), n + otherN)
}
}
}
agg1
}

private[ml] def toDouble(value: Any, fieldStruct: StructField, domain: Array[String]): Double = {
fieldStruct.dataType match {
case DataTypes.ByteType => value.asInstanceOf[Byte].doubleValue
case DataTypes.ShortType => value.asInstanceOf[Short].doubleValue
case DataTypes.IntegerType => value.asInstanceOf[Integer].doubleValue
case DataTypes.DoubleType => value.asInstanceOf[Double]
case DataTypes.StringType => domain.indexOf(value)
case _ => throw new IllegalArgumentException("Target column has to be an enum or a number. " + fieldStruct.toString)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.apache.spark.ml.spark.models;

public enum MissingValuesHandling {
NotAllowed, Skip, MeanImputation
}

0 comments on commit 2f252ba

Please sign in to comment.