#Spark Demo

Start the kernel

In [0]:
nitro.spark.create_kernel("http://169.53.153.8:9000/")

Load data from the file

In [1]:
val iris = sc.textFile("hdfs://10.122.48.12:8020/data/iris.csv")

Count elements

In [2]:
iris.count()

150

                                                                                

Create a random sample Array (no replacement, 10 elements)

In [3]:
iris.takeSample(false,10)

                                                                                

Array(5.0,3.4,1.6,0.4,setosa, 5.1,3.3,1.7,0.5,setosa, 5.7,2.6,3.5,1.0,versicolor, 5.3,3.7,1.5,0.2,setosa, 6.0,2.9,4.5,1.5,versicolor, 5.5,2.5,4.0,1.3,versicolor, 6.5,3.0,5.5,1.8,virginica, 5.4,3.7,1.5,0.2,setosa, 6.3,3.3,4.7,1.6,versicolor, 6.4,3.1,5.5,1.8,virginica)

Read the first line of the RDD

In [5]:
iris.first()

5.1,3.5,1.4,0.2,setosa

Filter lines that contain a specified word.

In [6]:
val versicolorLines = iris.filter(line => line.contains("versicolor"))
versicolorLines.first()

7.0,3.2,4.7,1.4,versicolor

Count the words in the file

In [7]:
//grab iris and split the lines into words
val words = iris.flatMap(line => line. split(",")) 
println(words.first()) //first line is 5.1,3.5,1.4,0.2,setosa
// Transform into pairs and count.
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
println(counts.count())

5.1
77


Get only unique members of the RDD

In [11]:
val dist = iris.distinct()
dist.count()

147

#Mlllib 
Machine learning with Spark

Compute Summary Statistics

In [12]:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}

//load and parse data to vectors
val iris = sc.textFile("hdfs://10.122.48.12:8020/data/iris.csv")
val data_iris = iris.map(l => l.split(",",-1) )
val parsedData = data_iris.map(r => Vectors.dense(Array( r(0).toDouble,
r(1).toDouble, r(2).toDouble, r(3).toDouble))).cache()

// Compute column summary statistics.
val summary: MultivariateStatisticalSummary = Statistics.colStats(parsedData)

println("Mean: " + summary.mean) // a dense vector containing the mean value for each column
println("Variance: "+ summary.variance) // column-wise variance
println("Min: "+ summary.min) // minimum value of each column
println("Max: "+ summary.max) // maximum value of each column
println("L1norm: "+ summary.normL1) // L1 norm of each column
println("L2norm: "+ summary.normL2) // Euclidean magnitude of each column

Mean: [5.843333333333332,3.0540000000000003,3.7586666666666666,1.1986666666666668]
Variance: [0.6856935123042509,0.18800402684563744,3.113179418344516,0.5824143176733783]
Min: [4.3,2.0,1.0,0.1]
Max: [7.9,4.4,6.9,2.5]
L1norm: [876.4999999999998,458.1000000000001,563.8000000000002,179.79999999999995]
L2norm: [72.27620631992245,37.77631533117014,50.82322303829225,17.38677658451963]


##Train a NaiveBayes Classifier

MLlib supports multinomial naive Bayes, which is typically used for document classification. Within that context, each observation is a document and each feature represents a term whose value is the frequency of the term. Feature values must be nonnegative to represent term frequencies.  

Parse data and generate random id for Iris objects

In [13]:
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

val iris = sc.textFile("hdfs://10.122.48.12:8020/data/iris.csv")
val parsedData = iris.map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.dropRight(1).map(_.toDouble)))
}
parsedData.take(2).foreach(println)


(5.1,[5.1,3.5,1.4,0.2])
(4.9,[4.9,3.0,1.4,0.2])


NaiveBayes implements multinomial naive Bayes. It takes an RDD of LabeledPoint and an optional smoothing parameter lambda as input, and output a NaiveBayesModel, which can be used for evaluation and prediction.

In [14]:
// Split data into training (40%) and test (60%).
val splits = parsedData.randomSplit(Array(0.4, 0.6), seed = 11L)
val training = splits(0)
val test = splits(1)

val model = NaiveBayes.train(training, lambda = 1.0)

val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))
val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()
//comparing the model's score to the model's predicted class 
println(accuracy)

0.06382978723404255


##K-means Clustering

In [42]:
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

//load and parse data to vectors
val iris = sc.textFile("hdfs://10.122.48.12:8020/data/iris.csv")
val data_iris = iris.map(l => l.split(",",-1) )
val parsedData = data_iris.map(r => Vectors.dense(Array( r(0).toDouble, 
r(1).toDouble, r(2).toDouble, r(3).toDouble))).cache()
// Cluster the data into three classes using KMeans.
val numClusters = 3
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Compute the sum of squared errors.
val cost = clusters.computeCost(parsedData)
println("Sum of squared errors = " + cost)

Sum of squared errors = 78.94084142614622


##PCA

In [48]:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix

val iris = sc.textFile("hdfs://10.122.48.12:8020/data/iris.csv")
//parse csv to vectors
val data_iris = iris.map(l => l.split(",",-1) )
val parsedData = data_iris.map(r => Vectors.dense(Array( r(0).toDouble, 
r(1).toDouble, r(2).toDouble, r(3).toDouble))).cache()
//pca takes a datamatrix, create it
val mat: RowMatrix = new RowMatrix(parsedData)
// Compute the top 4 principal components.
val pc: Matrix = mat.computePrincipalComponents(4) 
// Principal components are stored in a local dense matrix.
println(pc)

-0.36158967738144887  -0.6565398832858419  0.5809972798275919    0.3172545471685654    
0.08226888989221673   -0.729712371326486   -0.5964180879380797   -0.324094352418032    
-0.8565721052905274   0.17576740342866543  -0.07252407548692685  -0.47971898732994167  
-0.35884392624821654  0.07470647013501308  -0.5490609107266611   0.7511205603807821    


In [76]:
//Computes the covariance matrix, treating each row as an observation.
val mat: RowMatrix = new RowMatrix(parsedData)
val pc: Matrix = mat.computeCovariance() 
println(pc)

0.6856935123042476     -0.039268456375836536  1.2736823266219162   0.5169038031319939    
-0.039268456375836536  0.18800402684563267    -0.3217127516778646  -0.11798120805369283  
1.2736823266219197     -0.3217127516778646    3.113179418344508    1.2963874720357937    
0.5169038031319948     -0.11798120805369283   1.2963874720357937   0.5824143176733789    


##SVD

In [86]:
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD

val iris = sc.textFile("hdfs://10.122.48.12:8020/data/iris.csv")
//parse csv to vectors
val data_iris = iris.map(l => l.split(",",-1) )
val parsedData = data_iris.map(r => Vectors.dense(Array( r(0).toDouble, 
r(1).toDouble, r(2).toDouble, r(3).toDouble))).cache()
//pca takes a datamatrix, create it
val mat: RowMatrix = new RowMatrix(parsedData)

// Compute the top 4 singular values and corresponding singular vectors.
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(4, computeU = true)
val U: RowMatrix = svd.U  // The U factor is a RowMatrix.
val V: Matrix = svd.V   //   The V factor is a local dense matrix.
println(V)

-0.7511680505936613   -0.2858309594911222  0.49942378035010954  0.3234549582005334    
-0.3797883666928143   -0.5448897554611347  -0.6750249882865683  -0.3212432351148067   
-0.5131509372098667   0.7088987448097407   -0.0547198252265472  -0.48077481836613056  
-0.16787933742053857  0.3447584467378048   -0.5402988927775985  0.7490228620899887    


##Linear Regression Least Squares with SGD

In [88]:
//build a simple linear model to predict label values
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

//load and parse data
val iris = sc.textFile("hdfs://10.122.48.12:8020/data/iris.csv")
val parsedData = iris.map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.dropRight(1).map(_.toDouble)))
}.cache()

// Building the model
val numIterations = 100
val model = LinearRegressionWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
val valuesAndPreds = parsedData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
println("Training Mean Squared Error = " + MSE)

Training Mean Squared Error = 2.9717406458599726E256
