<a href="https://cocl.us/Data_Science_with_Scalla_top"><img src = "https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/SC0103EN/adds/Data_Science_with_Scalla_notebook_top.png" width = 750, align = "center"></a>
 <br/>
<a><img src="https://ibm.box.com/shared/static/ugcqz6ohbvff804xp84y4kqnvvk3bq1g.png" width="200" align="center"></a>"

# Basic Statistics and Data Types - Sampling 
 

## Lesson Objectives

-	After completing this lesson, you should be able to:
-	Perform standard sampling on any RDD 
-	Split any RDD randomly into subsets 
-	Perform stratified sampling on RDDs of key-value pairs 


## Sampling 

-	Can be performed on any RDD 
- Returns a sampled subset of an RDD
-	Sampling with or without replacement
- Fraction:
-	without replacement - expected size of the sample as fraction of RDD's size 
-	with replacement - expected number of times each element is chosen
-	Can be used on bootstrapping procedures

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.0` // Or use any other 2.x version here
import $ivy.`org.apache.spark::spark-mllib:2.4.0` // Or use any other 2.x version here
import  org.apache.spark.SparkContext
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
val sc= new SparkContext("local[*]","Sampling")


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36m$ivy.$                                   // Or use any other 2.x version here
[39m
[32mimport [39m[36m$ivy.$                                     // Or use any other 2.x version here
[39m
[32mimport [39m[36m org.apache.spark.SparkContext
[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[36msc[39m: [32mSparkContext[39m = org.apache.spark.SparkContext@48a88fda

In [17]:
// A Simple Sampling 

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD

val elements: RDD[Vector] = sc.parallelize(Array(
    Vectors.dense(4.0,7.0,13.0),
    Vectors.dense(-2.0,8.0,4.0),
    Vectors.dense(3.0,-11.0,19.0)))

elements.sample(withReplacement=false, fraction=0.5, seed=10L).collect()


[32mimport [39m[36morg.apache.spark.mllib.linalg.{Vector, Vectors}
[39m
[32mimport [39m[36morg.apache.spark.rdd.RDD

[39m
[36melements[39m: [32mRDD[39m[[32mVector[39m] = ParallelCollectionRDD[22] at parallelize at cmd16.sc:4
[36mres16_3[39m: [32mArray[39m[[32mVector[39m] = [33mArray[39m([4.0,7.0,13.0], [3.0,-11.0,19.0])

In [18]:
elements.sample(withReplacement=false, fraction=0.5, seed=7L).collect()

[36mres17[39m: [32mArray[39m[[32mVector[39m] = [33mArray[39m([4.0,7.0,13.0], [3.0,-11.0,19.0])

In [19]:
elements.sample(withReplacement=false, fraction=0.5, seed=64L).collect()

[36mres18[39m: [32mArray[39m[[32mVector[39m] = [33mArray[39m([4.0,7.0,13.0], [-2.0,8.0,4.0], [3.0,-11.0,19.0])

## Random Split

-	Can be performed on any RDD
-	Returns an array of RDDs
- Weights for the split will be normalized if they do not add up to 1
-	Useful for splitting a data set into training, test and validation sets

In [20]:
val data = sc.parallelize(1 to 1000000)
val splits = data.randomSplit(Array(0.6, 0.2, 0.2), seed = 13L)

val training = splits(0)
val test = splits(1)
val validation = splits(2)

splits.map(_.count())

[36mdata[39m: [32mRDD[39m[[32mInt[39m] = ParallelCollectionRDD[26] at parallelize at cmd19.sc:1
[36msplits[39m: [32mArray[39m[[32mRDD[39m[[32mInt[39m]] = [33mArray[39m(
  MapPartitionsRDD[27] at randomSplit at cmd19.sc:2,
  MapPartitionsRDD[28] at randomSplit at cmd19.sc:2,
  MapPartitionsRDD[29] at randomSplit at cmd19.sc:2
)
[36mtraining[39m: [32mRDD[39m[[32mInt[39m] = MapPartitionsRDD[27] at randomSplit at cmd19.sc:2
[36mtest[39m: [32mRDD[39m[[32mInt[39m] = MapPartitionsRDD[28] at randomSplit at cmd19.sc:2
[36mvalidation[39m: [32mRDD[39m[[32mInt[39m] = MapPartitionsRDD[29] at randomSplit at cmd19.sc:2
[36mres19_5[39m: [32mArray[39m[[32mLong[39m] = [33mArray[39m([32m601116L[39m, [32m199882L[39m, [32m199002L[39m)

## Stratified Sampling 

-	Can be performed on RDDs of key-value pairs 
-	Think of keys as labels and values as an specific attribute
- Two supported methods defined in `PairRDDFunctions`:
-	`sampleByKey` requires only one pass over the data and provides an expected sample size
-	`sampleByKeyExact` provides the exact sampling size with 99.99% confidence but requires significantly more resources

In [25]:
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.distributed.IndexedRow

val rows: RDD[IndexedRow] = sc.parallelize(Array(
    IndexedRow(0, Vectors.dense(1.0,2.0)),
    IndexedRow(1, Vectors.dense(4.0,5.0)),
    IndexedRow(1, Vectors.dense(7.0,8.0))))

val fractions: Map[Long, Double] = Map(0L -> 1.0, 1L -> 0.5)

val approxSample = rows.map{
    case IndexedRow(index, vec) => (index, vec)
}.sampleByKey(withReplacement = false, fractions, 9L)

approxSample.collect()

val approxSample2 = rows.map{
    case IndexedRow(index, vec) => (index, vec)
}.sampleByKeyExact(withReplacement = false, fractions, 9L)

approxSample2.collect()

[32mimport [39m[36morg.apache.spark.rdd.RDD
[39m
[32mimport [39m[36morg.apache.spark.mllib.linalg.distributed.IndexedRow

[39m
[36mrows[39m: [32mRDD[39m[[32mIndexedRow[39m] = ParallelCollectionRDD[45] at parallelize at cmd24.sc:4
[36mfractions[39m: [32mMap[39m[[32mLong[39m, [32mDouble[39m] = [33mMap[39m([32m0L[39m -> [32m1.0[39m, [32m1L[39m -> [32m0.5[39m)
[36mapproxSample[39m: [32mRDD[39m[([32mLong[39m, [32mVector[39m)] = MapPartitionsRDD[47] at sampleByKey at cmd24.sc:13
[36mres24_5[39m: [32mArray[39m[([32mLong[39m, [32mVector[39m)] = [33mArray[39m(
  ([32m0L[39m, [1.0,2.0]),
  ([32m1L[39m, [4.0,5.0]),
  ([32m1L[39m, [7.0,8.0])
)
[36mapproxSample2[39m: [32mRDD[39m[([32mLong[39m, [32mVector[39m)] = MapPartitionsRDD[50] at sampleByKeyExact at cmd24.sc:19
[36mres24_7[39m: [32mArray[39m[([32mLong[39m, [32mVector[39m)] = [33mArray[39m(([32m0L[39m, [1.0,2.0]), ([32m1L[39m, [7.0,8.0]))

In [26]:
sc.stop()

## Lesson Summary: 

-	Having completed this lesson, you should now be able to:
-	Perform standard sampling on any RDD
-	Split any RDD randomly into subsets
-	Perform stratified sampling on RDDs of key-value pairs

### About the Authors

[Petro Verkhogliad](https://www.linkedin.com/in/vpetro) is Consulting Manager at Lightbend. He holds a Masters degree in Computer Science with specialization in Intelligent Systems. He is passionate about functional programming and applications of AI.