# Text document processing workflow
### Slide 1, page 40-44

In [None]:
%scala 
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

case class Article(id: Long, topic: String, text: String)

val articles = spark.createDataFrame(Seq(
  Article(0, "sci.math", "Hello, Math!"),
  Article(1, "alt.religion", "Hello, Religion!"),
  Article(2, "sci.physics", "Hello, Physics!"),
  Article(3, "sci.math", "Hello, Math Revised!"),
  Article(4, "sci.math", "Better Math"),
  Article(5, "alt.religion", "TGIF"))).toDF

articles.show

In [None]:
%scala 
val topic2Label: Boolean => Double = x => if (x) 1 else 0

val toLabel = spark.udf.register("topic2Label", topic2Label)

val labelled = articles.withColumn("label", toLabel($"topic".like("sci%"))).cache

labelled.show

In [None]:
%scala 
import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.ml.feature.RegexTokenizer

val tokenizer = new RegexTokenizer().setInputCol("text").setOutputCol("words")
val tokenized = tokenizer.transform(labelled)

tokenized.show(false)

In [None]:
%scala 
import org.apache.spark.ml.feature.HashingTF

val hashingTF = new HashingTF().setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")
    .setNumFeatures(5000)
val hashed = hashingTF.transform(tokenized)

hashed.show(false)

In [None]:
%scala 
val Array(trainDF, testDF) = hashed.randomSplit(Array(0.8, 0.2))

trainDF.show
testDF.show

In [None]:
%scala 
import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression().setMaxIter(20).setRegParam(0.01)
val model = lr.fit(trainDF)
val pred = model.transform(testDF).select("topic", "label", "prediction")

pred.show

%scala 
# Text document processing workflow - Pipeline
### Slide 1, page 45

In [None]:
%scala 
val Array(trainDF2, testDF2) = labelled.randomSplit(Array(0.8, 0.2))

trainDF2.show
testDF2.show

In [None]:
%scala 
import org.apache.spark.ml.{Pipeline, PipelineModel}

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))
val model2 = pipeline.fit(trainDF2)
val pred = model2.transform(testDF2).select("topic", "label", "prediction")

pred.show

# Feature engineering
### Slide 1, page 53

In [None]:
%scala 
import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)

val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF

val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3")).setOutputCol("features")

va.transform(numsDF).show()

%scala 
# Feature engineering - continuous features
### Slide 1, page 56-58

In [None]:
%scala 
import org.apache.spark.ml.feature.Bucketizer

val contDF = spark.range(20).selectExpr("cast(id as double)")
val bucketBorders = Array(-1.0, 5.0, 10.0, 15.0, 20.0)
val bucketer = new Bucketizer().setSplits(bucketBorders).setInputCol("id")

bucketer.transform(contDF).show()

In [None]:
%scala 
import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)

val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF

val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3")).setOutputCol("features")

val nums = va.transform(numsDF)

In [None]:
%scala 
import org.apache.spark.ml.feature.StandardScaler

val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaled")
scaler.fit(nums).transform(nums).show(false)

In [None]:
%scala 
import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)

val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF

val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3")).setOutputCol("features")

val nums = va.transform(numsDF)

In [None]:
%scala 
import org.apache.spark.ml.feature.MaxAbsScaler

val maScaler = new MaxAbsScaler().setInputCol("features").setOutputCol("mas")
maScaler.fit(nums).transform(nums).show(false)

# Feature engineering - categorical features
### Slide 1, page 60-62

In [None]:
%scala 
val simpleDF = spark.read.json("simple-ml.json")

In [None]:
%scala 
import org.apache.spark.ml.feature.StringIndexer

val lblIndxr = new StringIndexer().setInputCol("lab").setOutputCol("labelInd")
val idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)

idxRes.show()

In [None]:
%scala 
import org.apache.spark.ml.feature.IndexToString

val labelReverse = new IndexToString().setInputCol("labelInd").setOutputCol("original")

labelReverse.transform(idxRes).show()

In [None]:
%scala 
import org.apache.spark.ml.feature.OneHotEncoder

val lblIndxr = new StringIndexer().setInputCol("color").setOutputCol("colorInd")
val colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
val ohe = new OneHotEncoder().setInputCol("colorInd").setOutputCol("one-hot")

ohe.transform(colorLab).show()

# Feature engineering - text data
### Slide 1, page 64-66

In [None]:
%scala 
val sales = spark.read.format("csv").option("header", "true").load("sales.csv").where("Description IS NOT NULL")
sales.show(false)

In [None]:
%scala 
import org.apache.spark.ml.feature.Tokenizer

val tkn = new Tokenizer().setInputCol("Description").setOutputCol("DescOut")
val tokenized = tkn.transform(sales.select("Description"))

tokenized.show(false)

In [None]:
%scala 
import org.apache.spark.ml.feature.StopWordsRemover

val df = spark.createDataFrame(Seq((0, Seq("I", "saw", "the", "red", "balloon")),
    (1, Seq("Mary", "had", "a", "little", "lamb")))).toDF("id", "raw")

val englishStopWords = StopWordsRemover.loadDefaultStopWords("english")

val stops = new StopWordsRemover().setStopWords(englishStopWords)
    .setInputCol("raw").setOutputCol("WithoutStops")

stops.transform(df).show(false)

In [None]:
%scala 
import org.apache.spark.ml.feature.CountVectorizer

val df = spark.createDataFrame(Seq((0, Array("a", "b", "c")),
    (1, Array("a", "b", "b", "c", "a")))).toDF("id", "words")

val cvModel = new CountVectorizer().setInputCol("words").setOutputCol("features").setVocabSize(3).setMinDF(2)
val fittedCV = cvModel.fit(df)

fittedCV.transform(df).show(false)

# Linear regression - normal equation
### Slide 2, page 27

In [None]:
%scala 
case class house(x1: Long, x2: Long, y: Long)

val trainData = spark.createDataFrame(Seq(house(2104, 3, 400), house(1600, 3, 330), house(2400, 3, 369),
    house(1416, 2, 232), house(3000, 4, 540))).toDF

val testData = spark.createDataFrame(Seq(house(4000, 4, 0))).toDF

In [None]:
%scala 
import org.apache.spark.ml.feature.VectorAssembler

val va = new VectorAssembler().setInputCols(Array("x1", "x2")).setOutputCol("features")

val train = va.transform(trainData)
val test = va.transform(testData)

In [None]:
%scala 
import org.apache.spark.ml.regression.LinearRegression

val lr = new LinearRegression().setFeaturesCol("features").setLabelCol("y").setSolver("normal")
val lrModel = lr.fit(train)

lrModel.transform(test).show

# Linear regression - gradient descent
### Slide 2, page 50

In [None]:
%scala 
val data = spark.read.format("libsvm").load("data.txt")

In [None]:
%scala 
import org.apache.spark.ml.regression.LinearRegression

val lr = new LinearRegression().setMaxIter(10)
val lrModel = lr.fit(data)

In [None]:
%scala 
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

val trainingSummary = lrModel.summary
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

# Logistic regression
### Slide 3, page 26

In [None]:
%scala 
case class cancer(x1: Long, y: Long)

val trainData = spark.createDataFrame(Seq(cancer(330, 1), cancer(120, 0), cancer(400, 1))).toDF
val testData = spark.createDataFrame(Seq(cancer(500, 0))).toDF

In [None]:
%scala 
import org.apache.spark.ml.feature.VectorAssembler

val va = new VectorAssembler().setInputCols(Array("x1")).setOutputCol("features")
val train = va.transform(trainData)
val test = va.transform(testData)

In [None]:
%scala 
import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression().setFeaturesCol("features").setLabelCol("y")
    .setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val lrModel = lr.fit(train)

lrModel.transform(test).show

# GraphX
### Slide 4, page 52-54

In [None]:
%scala 
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val initialMsg = -9999

In [None]:
%scala 
val vertices: RDD[(VertexId, (Int, Int))] = sc.parallelize(Array((1L, (1, -1)),
(2L, (2, -1)), (3L, (3, -1)), (6L, (6, -1))))

In [None]:
%scala 
val relationships: RDD[Edge[Boolean]] = sc.parallelize(Array(Edge(1L, 2L, true),
Edge(2L, 1L, true), Edge(2L, 6L, true), Edge(3L, 6L, true), Edge(6L, 1L, true),
Edge(6L, 3L, true)))

In [None]:
%scala 
val graph = Graph(vertices, relationships)

In [None]:
%scala 
def mergeMsg(msg1: Int, msg2: Int): Int = math.max(msg1, msg2)

In [None]:
%scala 
def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = {
    if (message == initialMsg) // superstep 0
        value
    else // superstep > 0
        (math.max(message, value._1), value._1) // return (newValue, oldValue)
}

In [None]:
%scala 
def sendMsg(triplet: EdgeTriplet[(Int, Int), Boolean]): Iterator[(VertexId, Int)] = {
    val sourceVertex = triplet.srcAttr
    if (sourceVertex._1 == sourceVertex._2) // newValue == oldValue for source vertex?
        Iterator.empty // do nothing
    else
    // propogate new (updated) value to the destination vertex
        Iterator((triplet.dstId, sourceVertex._1))
}

In [None]:
%scala 
val minGraph = graph.pregel(initialMsg,
    Int.MaxValue,
    EdgeDirection.Out)(
        vprog, // apply
        sendMsg, // scatter
        mergeMsg) // gather
    minGraph.vertices.collect.foreach {
        case (vertexId, (value, original_value)) => println(value)
    }