Skip to content

Commit

Permalink
[SYSTEMML-580] Add Scala LogisticRegression API For Spark Pipeline.
Browse files Browse the repository at this point in the history
This adds a Scala version of the LogisiticRegression Spark ML pipeline API, as well as Scala build support for the project, effectively turning the project into a mixed Scala/Java project.

Closes apache#70.
  • Loading branch information
Wenpei authored and dusenberrymw committed Mar 16, 2016
1 parent ef22676 commit c5693f5
Show file tree
Hide file tree
Showing 7 changed files with 413 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
.metadata
.cache*

# User configuration files
conf/SystemML-config.xml
Expand Down
28 changes: 28 additions & 0 deletions dev/project import.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
This document give a breaf instruction for developing SystemML project using an IDE.


# Import Systemml Project to Eclipse
Eclipse IDE include:
* [Scala IDE](http://scala-ide.org/)
* Eclipse Juno with scala plug-in

File -> Import -> Maven -> Existing Maven Projects

There are serval tips to resolve below compiler error:
#### `invalid cross-compiled libraries` error
Since Scala IDE bundles the latest versions (2.10.5 and 2.11.6 at this point), you need do add one in Eclipse Preferences -> Scala -> Installations by pointing to the lib/ directory of your Scala 2.10.4 distribution. Once this is done, select all Spark projects and right-click, choose Scala -> Set Scala Installation and point to the 2.10.4 installation. This should clear all errors about invalid cross-compiled libraries. A clean build should succeed now.

#### `incompatation scala version ` error
Change IDE scala version `project->propertiest->scala compiler -> scala installation` to `Fixed scala Installation: 2.10.5`

#### `Not found type * ` error
Run command `mvn package`, and do `project -> refresh`

#### `maketplace not found ` error for Eclipse Luna
Except scala IDE pulgin install, please make sure get update from "http://alchim31.free.fr/m2e-scala/update-site" to update maven connector for scala.

# Import SystemML project to IntelliJ

1. Download IntelliJ and install the Scala plug-in for IntelliJ.
2. Go to "File -> Import Project", locate the spark source directory, and select "Maven Project".
3. In the Import wizard, it's fine to leave settings at their default. However it is usually useful to enable "Import Maven projects automatically", since changes to the project structure will automatically update the IntelliJ project.
81 changes: 77 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
<hadoop.version>2.4.1</hadoop.version>
<antlr.version>4.3</antlr.version>
<spark.version>1.4.1</spark.version>

<scala.version>2.10.5</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<!-- OS-specific JVM arguments for running integration tests -->
<integrationTestExtraJVMArgs />
</properties>
Expand Down Expand Up @@ -186,7 +187,6 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand All @@ -195,7 +195,33 @@
<target>1.6</target>
</configuration>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>eclipse-add-source</id>
<goals>
<goal>add-source</goal>
</goals>
</execution>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
Expand Down Expand Up @@ -883,7 +909,54 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>2.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<version>1.11.3</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
169 changes: 169 additions & 0 deletions src/main/scala/org/apache/sysml/api/ml/scala/LogisticRegression.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysml.api.ml.scala

import org.apache.sysml.api.{MLContext, MLOutput}
import org.apache.sysml.runtime.matrix.MatrixCharacteristics
import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt => RDDConverterUtils }
import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt => RDDConverterUtils}

import org.apache.spark.{ SparkContext }
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
import org.apache.spark.ml.{ Model, Estimator }
import org.apache.spark.ml.classification._
import org.apache.spark.ml.param.{ Params, Param, ParamMap,DoubleParam }
import org.apache.spark.ml.param.shared._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

trait HasIcpt extends Params {
final val icpt: Param[Int] = new Param[Int](this, "icpt", "Intercept presence, shifting and rescaling X columns")
setDefault(icpt, 0)
final def getIcpt: Int = $(icpt)
}
trait HasMaxOuterIter extends Params {
final val maxOuterIter: Param[Int] = new Param[Int](this, "maxOuterIter", "max. number of outer (Newton) iterations")
setDefault(maxOuterIter, 100)
final def getMaxOuterIte: Int = $(maxOuterIter)
}
trait HasMaxInnerIter extends Params {
final val maxInnerIter: Param[Int] = new Param[Int](this, "maxInnerIter", "max. number of inner (conjugate gradient) iterations, 0 = no max")
setDefault(maxInnerIter, 0)
final def getMaxInnerIter: Int = $(maxInnerIter)
}
trait HasTol extends Params {
final val tol: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms")
setDefault(tol,0.000001)
final def getTol: Double = $(tol)
}
trait HasRegParam extends Params {
final val regParam: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms")
setDefault(regParam,0.000001)
final def getRegParam: Double = $(regParam)
}
object LogisticRegression{
final val scriptPath = "MultiLogReg.dml"
}
class LogisticRegression(override val uid: String,val sc:SparkContext) extends Estimator[LogisticRegressionModel] with HasIcpt
with HasRegParam with HasTol with HasMaxOuterIter with HasMaxInnerIter {

def setIcpt(value: Int) = set(icpt, value)
def setMaxOuterIter(value: Int) = set(maxOuterIter, value)
def setMaxInnerIter(value: Int) = set(maxInnerIter, value)
def setRegParam(value: Double) = set(regParam, value)
def setTol(value: Double) = set(tol, value)

override def copy(extra: ParamMap): LogisticRegression = defaultCopy(extra)
override def transformSchema(schema: StructType): StructType = schema
override def fit(df: DataFrame): LogisticRegressionModel = {
val ml = new MLContext(df.rdd.sparkContext)
val mcXin = new MatrixCharacteristics()
val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(sc, df, mcXin, false, "features")
val yin = df.select("label").rdd.map { _.apply(0).toString() }

val mloutput = {
val paramsMap:Map[String,String] = Map(
"icpt"->this.getIcpt.toString(),
"reg" ->this.getRegParam.toString(),
"tol" ->this.getTol.toString,
"moi" ->this.getMaxOuterIte.toString,
"mii" ->this.getMaxInnerIter.toString,

"X" -> " ",
"Y" -> " ",
"B" -> " "
)
ml.registerInput("X", Xin, mcXin);
ml.registerInput("Y_vec", yin, "csv");
ml.registerOutput("B_out");
ml.execute(ScriptsUtils.resolvePath(LogisticRegression.scriptPath),paramsMap)
}
new LogisticRegressionModel("logisticRegression")(mloutput)
}
}
object LogisticRegressionModel{
final val scriptPath = "GLM-predict.dml"
}
class LogisticRegressionModel(
override val uid: String)(
val mloutput: MLOutput) extends Model[LogisticRegressionModel] with HasIcpt
with HasRegParam with HasTol with HasMaxOuterIter with HasMaxInnerIter {
override def copy(extra: ParamMap): LogisticRegressionModel = defaultCopy(extra)
override def transformSchema(schema: StructType): StructType = schema
override def transform(df: DataFrame): DataFrame = {
val ml = new MLContext(df.rdd.sparkContext)

val mcXin = new MatrixCharacteristics()
val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(df.rdd.sparkContext, df, mcXin, false, "features")
val yin = df.select("label").rdd.map { _.apply(0).toString() }

val mlscoreoutput = {
val paramsMap:Map[String,String] = Map(
"dfam" -> "3",
"X" -> " ",
"B" -> " "
)
ml.registerInput("X", Xin, mcXin);
ml.registerInput("B_full", mloutput.getBinaryBlockedRDD("B_out"),mloutput.getMatrixCharacteristics("B_out"));
ml.registerInput("Y", yin,"csv")
ml.registerOutput("means");
ml.execute(ScriptsUtils.resolvePath(LogisticRegressionModel.scriptPath),paramsMap)
}

mlscoreoutput.getDF(df.sqlContext, "means", true).withColumnRenamed("C1", "probability")
}
}

object LogisticRegressionExample {
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.types._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

def main(args: Array[String]) = {
val sparkConf: SparkConf = new SparkConf();
val sc: SparkContext = new SparkContext("local", "TestLocal", sparkConf);
val sqlContext = new org.apache.spark.sql.SQLContext(sc);

import sqlContext.implicits._
val training = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)),
LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)),
LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)),
LabeledPoint(1.0, Vectors.dense(1.0, 0.5, 2.2)),
LabeledPoint(2.0, Vectors.dense(1.6, 0.8, 3.6)),
LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3))))
val lr = new LogisticRegression("log", sc)
val lrmodel = lr.fit(training.toDF)
lrmodel.mloutput.getDF(sqlContext, "B_out").show()

val testing = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)),
LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)),
LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)),
LabeledPoint(1.0, Vectors.dense(1.0, 0.5, 2.2)),
LabeledPoint(2.0, Vectors.dense(1.6, 0.8, 3.6)),
LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3))))

lrmodel.transform(testing.toDF).show
}
}
33 changes: 33 additions & 0 deletions src/main/scala/org/apache/sysml/api/ml/scala/ScriptsUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysml.api.ml.scala

import java.io.File

object ScriptsUtils {
var systemmlHome = System.getenv("SYSTEMML_HOME")
def resolvePath(filename:String):String = {
import java.io.File
ScriptsUtils.systemmlHome + File.separator + "algorithms" + File.separator + filename
}
def setSystemmlHome(path:String) {
systemmlHome = path
}
}

0 comments on commit c5693f5

Please sign in to comment.