Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into spark-home-reprise
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Aug 2, 2014
2 parents 188fc5d + e25ec06 commit 1c2532c
Show file tree
Hide file tree
Showing 15 changed files with 706 additions and 131 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ dist/*
.*ipr
.*iws
logs
.*scalastyle-output.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

// Attempt to connect, restart and retry once if it fails
try {
new Socket(daemonHost, daemonPort)
val socket = new Socket(daemonHost, daemonPort)
val launchStatus = new DataInputStream(socket.getInputStream).readInt()
if (launchStatus != 0) {
throw new IllegalStateException("Python daemon failed to launch worker")
}
socket
} catch {
case exc: SocketException =>
logWarning("Python daemon unexpectedly quit, attempting to restart")
logWarning("Failed to open socket to Python daemon:", exc)
logWarning("Assuming that daemon unexpectedly quit, attempting to restart")
stopDaemon()
startDaemon()
new Socket(daemonHost, daemonPort)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.mllib

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Train a linear regression model on one stream of data and make predictions
* on another stream, where the data streams arrive as text files
* into two different directories.
*
* The rows of the text files must be labeled data points in the form
* `(y,[x1,x2,x3,...,xn])`
* Where n is the number of features. n must be the same for train and test.
*
* Usage: StreamingLinearRegression <trainingDir> <testDir> <batchDuration> <numFeatures>
*
* To run on your local machine using the two directories `trainingDir` and `testDir`,
* with updates every 5 seconds, and 2 features per data point, call:
* $ bin/run-example \
* org.apache.spark.examples.mllib.StreamingLinearRegression trainingDir testDir 5 2
*
* As you add text files to `trainingDir` the model will continuously update.
* Anytime you add text files to `testDir`, you'll see predictions from the current model.
*
*/
object StreamingLinearRegression {

def main(args: Array[String]) {

if (args.length != 4) {
System.err.println(
"Usage: StreamingLinearRegression <trainingDir> <testDir> <batchDuration> <numFeatures>")
System.exit(1)
}

val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))

val trainingData = MLUtils.loadStreamingLabeledPoints(ssc, args(0))
val testData = MLUtils.loadStreamingLabeledPoints(ssc, args(1))

val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(Array.fill[Double](args(3).toInt)(0)))

model.trainOn(trainingData)
model.predictOn(testData).print()

ssc.start()
ssc.awaitTermination()

}

}
5 changes: 5 additions & 0 deletions mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.random.{RandomRDDGenerators => RG}
import org.apache.spark.mllib.recommendation._
Expand Down Expand Up @@ -252,15 +254,27 @@ class PythonMLLibAPI extends Serializable {
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
initialWeightsBA: Array[Byte],
regParam: Double,
regType: String,
intercept: Boolean): java.util.List[java.lang.Object] = {
val lrAlg = new LinearRegressionWithSGD()
lrAlg.setIntercept(intercept)
lrAlg.optimizer
.setNumIterations(numIterations)
.setRegParam(regParam)
.setStepSize(stepSize)
if (regType == "l2") {
lrAlg.optimizer.setUpdater(new SquaredL2Updater)
} else if (regType == "l1") {
lrAlg.optimizer.setUpdater(new L1Updater)
} else if (regType != "none") {
throw new java.lang.IllegalArgumentException("Invalid value for 'regType' parameter."
+ " Can only be initialized using the following string values: [l1, l2, none].")
}
trainRegressionModel(
(data, initialWeights) =>
LinearRegressionWithSGD.train(
data,
numIterations,
stepSize,
miniBatchFraction,
initialWeights),
lrAlg.run(data, initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ object GradientDescent extends Logging {
val numExamples = data.count()
val miniBatchSize = numExamples * miniBatchFraction

// if no data, return initial weights to avoid NaNs
if (numExamples == 0) {

logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
return (initialWeights, stochasticLossHistory.toArray)

}

// Initialize weights as a column vector
var weights = Vectors.dense(initialWeights.toArray)
val n = weights.size
Expand Down Expand Up @@ -202,5 +210,6 @@ object GradientDescent extends Logging {
stochasticLossHistory.takeRight(10).mkString(", ")))

(weights, stochasticLossHistory.toArray)

}
}
126 changes: 122 additions & 4 deletions mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.spark.mllib.recommendation

import scala.collection.mutable.{ArrayBuffer, BitSet}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.math.{abs, sqrt}
import scala.util.Random
import scala.util.Sorting
import scala.util.hashing.byteswap32

import org.jblas.{DoubleMatrix, SimpleBlas, Solve}

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{Logging, HashPartitioner, Partitioner}
import org.apache.spark.storage.StorageLevel
Expand All @@ -39,7 +40,8 @@ import org.apache.spark.mllib.optimization.NNLS
* of the elements within this block, and the list of destination blocks that each user or
* product will need to send its feature vector to.
*/
private[recommendation] case class OutLinkBlock(elementIds: Array[Int], shouldSend: Array[BitSet])
private[recommendation]
case class OutLinkBlock(elementIds: Array[Int], shouldSend: Array[mutable.BitSet])


/**
Expand Down Expand Up @@ -382,7 +384,7 @@ class ALS private (
val userIds = ratings.map(_.user).distinct.sorted
val numUsers = userIds.length
val userIdToPos = userIds.zipWithIndex.toMap
val shouldSend = Array.fill(numUsers)(new BitSet(numProductBlocks))
val shouldSend = Array.fill(numUsers)(new mutable.BitSet(numProductBlocks))
for (r <- ratings) {
shouldSend(userIdToPos(r.user))(productPartitioner.getPartition(r.product)) = true
}
Expand Down Expand Up @@ -797,4 +799,120 @@ object ALS {
: MatrixFactorizationModel = {
trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0)
}

/**
* :: DeveloperApi ::
* Statistics of a block in ALS computation.
*
* @param category type of this block, "user" or "product"
* @param index index of this block
* @param count number of users or products inside this block, the same as the number of
* least-squares problems to solve on this block in each iteration
* @param numRatings total number of ratings inside this block, the same as the number of outer
* products we need to make on this block in each iteration
* @param numInLinks total number of incoming links, the same as the number of vectors to retrieve
* before each iteration
* @param numOutLinks total number of outgoing links, the same as the number of vectors to send
* for the next iteration
*/
@DeveloperApi
case class BlockStats(
category: String,
index: Int,
count: Long,
numRatings: Long,
numInLinks: Long,
numOutLinks: Long)

/**
* :: DeveloperApi ::
* Given an RDD of ratings, number of user blocks, and number of product blocks, computes the
* statistics of each block in ALS computation. This is useful for estimating cost and diagnosing
* load balance.
*
* @param ratings an RDD of ratings
* @param numUserBlocks number of user blocks
* @param numProductBlocks number of product blocks
* @return statistics of user blocks and product blocks
*/
@DeveloperApi
def analyzeBlocks(
ratings: RDD[Rating],
numUserBlocks: Int,
numProductBlocks: Int): Array[BlockStats] = {

val userPartitioner = new ALSPartitioner(numUserBlocks)
val productPartitioner = new ALSPartitioner(numProductBlocks)

val ratingsByUserBlock = ratings.map { rating =>
(userPartitioner.getPartition(rating.user), rating)
}
val ratingsByProductBlock = ratings.map { rating =>
(productPartitioner.getPartition(rating.product),
Rating(rating.product, rating.user, rating.rating))
}

val als = new ALS()
val (userIn, userOut) =
als.makeLinkRDDs(numUserBlocks, numProductBlocks, ratingsByUserBlock, userPartitioner)
val (prodIn, prodOut) =
als.makeLinkRDDs(numProductBlocks, numUserBlocks, ratingsByProductBlock, productPartitioner)

def sendGrid(outLinks: RDD[(Int, OutLinkBlock)]): Map[(Int, Int), Long] = {
outLinks.map { x =>
val grid = new mutable.HashMap[(Int, Int), Long]()
val uPartition = x._1
x._2.shouldSend.foreach { ss =>
ss.foreach { pPartition =>
val pair = (uPartition, pPartition)
grid.put(pair, grid.getOrElse(pair, 0L) + 1L)
}
}
grid
}.reduce { (grid1, grid2) =>
grid2.foreach { x =>
grid1.put(x._1, grid1.getOrElse(x._1, 0L) + x._2)
}
grid1
}.toMap
}

val userSendGrid = sendGrid(userOut)
val prodSendGrid = sendGrid(prodOut)

val userInbound = new Array[Long](numUserBlocks)
val prodInbound = new Array[Long](numProductBlocks)
val userOutbound = new Array[Long](numUserBlocks)
val prodOutbound = new Array[Long](numProductBlocks)

for (u <- 0 until numUserBlocks; p <- 0 until numProductBlocks) {
userOutbound(u) += userSendGrid.getOrElse((u, p), 0L)
prodInbound(p) += userSendGrid.getOrElse((u, p), 0L)
userInbound(u) += prodSendGrid.getOrElse((p, u), 0L)
prodOutbound(p) += prodSendGrid.getOrElse((p, u), 0L)
}

val userCounts = userOut.mapValues(x => x.elementIds.length).collectAsMap()
val prodCounts = prodOut.mapValues(x => x.elementIds.length).collectAsMap()

val userRatings = countRatings(userIn)
val prodRatings = countRatings(prodIn)

val userStats = Array.tabulate(numUserBlocks)(
u => BlockStats("user", u, userCounts(u), userRatings(u), userInbound(u), userOutbound(u)))
val productStatus = Array.tabulate(numProductBlocks)(
p => BlockStats("product", p, prodCounts(p), prodRatings(p), prodInbound(p), prodOutbound(p)))

(userStats ++ productStatus).toArray
}

private def countRatings(inLinks: RDD[(Int, InLinkBlock)]): Map[Int, Long] = {
inLinks.mapValues { ilb =>
var numRatings = 0L
ilb.ratingsForBlock.foreach { ar =>
ar.foreach { p => numRatings += p._1.length }
}
numRatings
}.collectAsMap().toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class LinearRegressionModel (
* its corresponding right hand side label y.
* See also the documentation for the precise formulation.
*/
class LinearRegressionWithSGD private (
class LinearRegressionWithSGD private[mllib] (
private var stepSize: Double,
private var numIterations: Int,
private var miniBatchFraction: Double)
Expand All @@ -68,7 +68,7 @@ class LinearRegressionWithSGD private (
*/
def this() = this(1.0, 100, 1.0)

override protected def createModel(weights: Vector, intercept: Double) = {
override protected[mllib] def createModel(weights: Vector, intercept: Double) = {
new LinearRegressionModel(weights, intercept)
}
}
Expand Down
Loading

0 comments on commit 1c2532c

Please sign in to comment.