# DataFrame

In [None]:
val spark: SparkSession = SparkSession.builder().master("local").appName("test").getOrCreate()

val people = spark.read.format("json").load("people.json")

In [None]:
import org.apache.spark.sql.functions._

people.select(col("name"), expr("age + 3")).show()

In [None]:
import spark.implicits._

people.select($"name", $"age" + 3).show()

In [None]:
people.where("age < 20").show()

In [None]:
people.select("name").distinct().count()

In [None]:
people.withColumn("teenager", expr("age < 20")).show()

In [None]:
people.withColumnRenamed("name", "username").columns

In [None]:
people.select(count("age")).show()

In [None]:
people.select(first("name"), last("age")).show()

In [None]:
people.select(avg("age")).show()

In [None]:
people.groupBy("name").agg(count("age")).show()

In [None]:
val t1 = spark.createDataFrame(Seq((0, "a", 0), (1, "b", 1), (2, "c", 1))).toDF("num", "name", "id")
val t2 = spark.createDataFrame(Seq((0, "x"), (1, "y"), (2, "z"))).toDF("id", "group")

val joinExpression = t1.col("id") === t2.col("id")
var joinType = "inner"

t1.join(t2, joinExpression, joinType).show()

In [None]:
import org.apache.spark.sql.functions.udf
val df = spark.createDataFrame(Seq((0, "hello"), (1, "world"))).toDF("id", "text")
val upper: String => String = _.toUpperCase
val upperUDF = spark.udf.register("upper", upper)
df.withColumn("upper", upperUDF(col("text"))).show

# Text Processing Example - Individual Stages

In [None]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

case class Article(id: Long, topic: String, text: String)

val articles = spark.createDataFrame(Seq(
    Article(0, "sci.math", "Hello, Math!"),
    Article(1, "alt.religion", "Hello, Religion!"),
    Article(2, "sci.physics", "Hello, Physics!"),
    Article(3, "sci.math", "Hello, Math Revised!"),
    Article(4, "sci.math", "Better Math"),
    Article(5, "alt.religion", "TGIF"))).toDF

articles.show

In [None]:
val topic2Label: Boolean => Double = x => if (x) 1 else 0

val toLabel = spark.udf.register("topic2Label", topic2Label)

val labelled = articles.withColumn("label", toLabel($"topic".like("sci%"))).cache

labelled.show

In [None]:
import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.ml.feature.RegexTokenizer

val tokenizer = new RegexTokenizer().setInputCol("text").setOutputCol("words")

val tokenized = tokenizer.transform(labelled)

tokenized.show(false)

In [None]:
import org.apache.spark.ml.feature.HashingTF

val hashingTF = new HashingTF().setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")
    .setNumFeatures(5000)

val hashed = hashingTF.transform(tokenized)

hashed.show(false)

In [None]:
val Array(trainDF, testDF) = hashed.randomSplit(Array(0.8, 0.2))

trainDF.show

testDF.show

In [None]:
import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression().setMaxIter(20).setRegParam(0.01)

val model = lr.fit(trainDF)

val pred = model.transform(testDF).select("topic", "label", "prediction")

pred.show

# Text Processing Example - Pipeline

In [None]:
val Array(trainDF2, testDF2) = labelled.randomSplit(Array(0.8, 0.2))

trainDF2.show

testDF2.show

In [None]:
import org.apache.spark.ml.{Pipeline, PipelineModel}

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))

val model2 = pipeline.fit(trainDF2)

val pred = model2.transform(testDF2).select("topic", "label", "prediction")

pred.show

# Feature Engineering - Vector Assembler

In [None]:
import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)

val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF

val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3")).setOutputCol("features")

va.transform(numsDF).show

# Feature Engineering - Continues Features

In [None]:
import org.apache.spark.ml.feature.Bucketizer

val contDF = spark.range(20).selectExpr("cast(id as double)")

val bucketBorders = Array(-1.0, 5.0, 10.0, 15.0, 20.0)
val bucketer = new Bucketizer().setSplits(bucketBorders).setInputCol("id")

bucketer.transform(contDF).show()

In [None]:
import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)

val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF

val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3")).setOutputCol("features")

val nums = va.transform(numsDF)

In [None]:
import org.apache.spark.ml.feature.StandardScaler

val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaled")

scaler.fit(nums).transform(nums).show()

In [None]:
import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)

val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF

val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3")).setOutputCol("features")

val nums = va.transform(numsDF)

# Feature Engineering - Categorical Features

In [None]:
val simpleDF = spark.read.json("simple-ml.json")

simpleDF.show

In [None]:
import org.apache.spark.ml.feature.StringIndexer

val lblIndxr = new StringIndexer().setInputCol("lab").setOutputCol("labelInd")

val idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)

idxRes.show()

In [None]:
import org.apache.spark.ml.feature.IndexToString

val labelReverse = new IndexToString().setInputCol("labelInd").setOutputCol("original")

labelReverse.transform(idxRes).show()

In [None]:
import org.apache.spark.ml.feature.OneHotEncoder

val lblIndxr = new StringIndexer().setInputCol("color").setOutputCol("colorInd")

val colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))

val ohe = new OneHotEncoder().setInputCol("colorInd").setOutputCol("one-hot")

ohe.transform(colorLab).show()

# Feature Engineering - Text Features

In [None]:
val sales = spark.read.format("csv").option("header", "true").load("sales.csv").where("Description IS NOT NULL")

sales.show(5, false)

In [None]:
import org.apache.spark.ml.feature.Tokenizer

val tkn = new Tokenizer().setInputCol("Description").setOutputCol("DescOut")

val tokenized = tkn.transform(sales.select("Description"))

tokenized.show(false)

In [None]:
import org.apache.spark.ml.feature.StopWordsRemover

val df = spark.createDataFrame(Seq((0, Seq("I", "saw", "the", "red", "balloon")),
                                   (1, Seq("Mary", "had", "a", "little", "lamb")))).toDF("id", "raw")

val englishStopWords = StopWordsRemover.loadDefaultStopWords("english")

val stops = new StopWordsRemover().setStopWords(englishStopWords).setInputCol("raw").setOutputCol("WithoutStops")

stops.transform(df).show(false)

In [None]:
import org.apache.spark.ml.feature.CountVectorizer

val df = spark.createDataFrame(Seq((0, Array("a", "b", "c")),
                                   (1, Array("a", "b", "b", "c", "a")))).toDF("id", "words")

val cvModel = new CountVectorizer().setInputCol("words").setOutputCol("features").setVocabSize(3).setMinDF(2)

val fittedCV = cvModel.fit(df)

fittedCV.transform(df).show(false)

# Linear regression

In [None]:
val data = spark.read.format("libsvm").load("data.txt")

In [None]:
import org.apache.spark.ml.regression.LinearRegression

val lr = new LinearRegression().setMaxIter(10)
val lrModel = lr.fit(data)

In [None]:
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
val trainingSummary = lrModel.summary
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

# Binary classification

In [None]:
case class cancer(x1: Long, y: Long)
val trainData = spark.createDataFrame(Seq(cancer(330, 1), cancer(120, 0), cancer(400, 1))).toDF
val testData = spark.createDataFrame(Seq(cancer(500, 0))).toDF

In [None]:
import org.apache.spark.ml.feature.VectorAssembler

val va = new VectorAssembler().setInputCols(Array("x1")).setOutputCol("features")
val train = va.transform(trainData)
val test = va.transform(testData)

In [None]:
import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression().setFeaturesCol("features").setLabelCol("y")
.setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val lrModel = lr.fit(train)
lrModel.transform(test).show

# Multi-class classification

In [None]:
val training = spark.read.format("libsvm").load("multiclass_data.txt")

import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val lrModel = lr.fit(training)

println(s"Coefficients: \n${lrModel.coefficientMatrix}")
println(s"Intercepts: \n${lrModel.interceptVector}")