# ID2223 HT21 Lab Session 1
Working with Spark:
- DataFrame
- Text Processing Example 
  - Individual Stages
  - Pipeline
- Feature Engineering 
  - Vector Assembler
  - Continues Features
  - Categorical Features
  - Text Features
- Linear regression
- Binary classification
- Multi-class classification

# DataFrame

In [1]:
// val spark: SparkSession = SparkSession.builder().master("local").appName("test").getOrCreate()

val people = spark.read.format("json").load("people.json")

Intitializing Scala interpreter ...

Spark Web UI available at http://e38e74d79d1c:4040
SparkContext available as 'sc' (version = 3.2.0, master = local[*], app id = local-1636700527321)
SparkSession available as 'spark'


people: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field]


In [2]:
people.show()

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 15| 12|Michael|
| 30| 15|   Andy|
| 19| 20| Justin|
| 12| 15|   Andy|
| 19| 20|    Jim|
| 12| 10|   Andy|
+---+---+-------+



In [3]:
import org.apache.spark.sql.functions._

people.select(col("name"), expr("age + 3")).show()

+-------+---------+
|   name|(age + 3)|
+-------+---------+
|Michael|       18|
|   Andy|       33|
| Justin|       22|
|   Andy|       15|
|    Jim|       22|
|   Andy|       15|
+-------+---------+



import org.apache.spark.sql.functions._


In [4]:
import spark.implicits._

people.select($"name", $"age" + 3).show()

+-------+---------+
|   name|(age + 3)|
+-------+---------+
|Michael|       18|
|   Andy|       33|
| Justin|       22|
|   Andy|       15|
|    Jim|       22|
|   Andy|       15|
+-------+---------+



import spark.implicits._


In [5]:
people.where("age < 20").show()

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 15| 12|Michael|
| 19| 20| Justin|
| 12| 15|   Andy|
| 19| 20|    Jim|
| 12| 10|   Andy|
+---+---+-------+



In [6]:
people.select("name").distinct().count()

res4: Long = 4


In [7]:
people.withColumn("teenager", expr("age < 20")).show()

+---+---+-------+--------+
|age| id|   name|teenager|
+---+---+-------+--------+
| 15| 12|Michael|    true|
| 30| 15|   Andy|   false|
| 19| 20| Justin|    true|
| 12| 15|   Andy|    true|
| 19| 20|    Jim|    true|
| 12| 10|   Andy|    true|
+---+---+-------+--------+



In [8]:
people.withColumnRenamed("name", "username").columns

res6: Array[String] = Array(age, id, username)


In [9]:
people.select(count("age")).show()

+----------+
|count(age)|
+----------+
|         6|
+----------+



In [10]:
people.select(first("name"), last("age")).show()

+-----------+---------+
|first(name)|last(age)|
+-----------+---------+
|    Michael|       12|
+-----------+---------+



In [11]:
people.select(avg("age")).show()

+------------------+
|          avg(age)|
+------------------+
|17.833333333333332|
+------------------+



In [12]:
people.groupBy("name").agg(count("age")).show()

+-------+----------+
|   name|count(age)|
+-------+----------+
|    Jim|         1|
|Michael|         1|
|   Andy|         3|
| Justin|         1|
+-------+----------+



In [13]:
val t1 = spark.createDataFrame(Seq((0, "a", 0), (1, "b", 1), (2, "c", 1))).toDF("num", "name", "id")
val t2 = spark.createDataFrame(Seq((0, "x"), (1, "y"), (2, "z"))).toDF("id", "group")

val joinExpression = t1.col("id") === t2.col("id")
var joinType = "inner"

t1.join(t2, joinExpression, joinType).show()

+---+----+---+---+-----+
|num|name| id| id|group|
+---+----+---+---+-----+
|  0|   a|  0|  0|    x|
|  1|   b|  1|  1|    y|
|  2|   c|  1|  1|    y|
+---+----+---+---+-----+



t1: org.apache.spark.sql.DataFrame = [num: int, name: string ... 1 more field]
t2: org.apache.spark.sql.DataFrame = [id: int, group: string]
joinExpression: org.apache.spark.sql.Column = (id = id)
joinType: String = inner


In [14]:
import org.apache.spark.sql.functions.udf
val df = spark.createDataFrame(Seq((0, "hello"), (1, "world"))).toDF("id", "text")
val upper: String => String = _.toUpperCase
val upperUDF = spark.udf.register("upper", upper)
df.withColumn("upper", upperUDF(col("text"))).show

+---+-----+-----+
| id| text|upper|
+---+-----+-----+
|  0|hello|HELLO|
|  1|world|WORLD|
+---+-----+-----+



import org.apache.spark.sql.functions.udf
df: org.apache.spark.sql.DataFrame = [id: int, text: string]
upper: String => String = $Lambda$4624/0x0000000841879840@7ff29c7
upperUDF: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4624/0x0000000841879840@7ff29c7,StringType,List(Some(class[value[0]: string])),Some(class[value[0]: string]),Some(upper),true,true)


# Text Processing Example - Individual Stages

In [15]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

case class Article(id: Long, topic: String, text: String)

val articles = spark.createDataFrame(Seq(
    Article(0, "sci.math", "Hello, Math!"),
    Article(1, "alt.religion", "Hello, Religion!"),
    Article(2, "sci.physics", "Hello, Physics!"),
    Article(3, "sci.math", "Hello, Math Revised!"),
    Article(4, "sci.math", "Better Math"),
    Article(5, "alt.religion", "TGIF"))).toDF

articles.show

+---+------------+--------------------+
| id|       topic|                text|
+---+------------+--------------------+
|  0|    sci.math|        Hello, Math!|
|  1|alt.religion|    Hello, Religion!|
|  2| sci.physics|     Hello, Physics!|
|  3|    sci.math|Hello, Math Revised!|
|  4|    sci.math|         Better Math|
|  5|alt.religion|                TGIF|
+---+------------+--------------------+



import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
defined class Article
articles: org.apache.spark.sql.DataFrame = [id: bigint, topic: string ... 1 more field]


In [16]:
val topic2Label: Boolean => Double = x => if (x) 1 else 0

val toLabel = spark.udf.register("topic2Label", topic2Label)

val labelled = articles.withColumn("label", toLabel($"topic".like("sci%"))).cache

labelled.show

+---+------------+--------------------+-----+
| id|       topic|                text|label|
+---+------------+--------------------+-----+
|  0|    sci.math|        Hello, Math!|  1.0|
|  1|alt.religion|    Hello, Religion!|  0.0|
|  2| sci.physics|     Hello, Physics!|  1.0|
|  3|    sci.math|Hello, Math Revised!|  1.0|
|  4|    sci.math|         Better Math|  1.0|
|  5|alt.religion|                TGIF|  0.0|
+---+------------+--------------------+-----+



topic2Label: Boolean => Double = $Lambda$4826/0x0000000841922040@22d6b6c0
toLabel: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4826/0x0000000841922040@22d6b6c0,DoubleType,List(Some(class[value[0]: boolean])),Some(class[value[0]: double]),Some(topic2Label),false,true)
labelled: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, topic: string ... 2 more fields]


In [17]:
import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.ml.feature.RegexTokenizer

val tokenizer = new RegexTokenizer().setInputCol("text").setOutputCol("words")

val tokenized = tokenizer.transform(labelled)

tokenized.show(false)

+---+------------+--------------------+-----+------------------------+
|id |topic       |text                |label|words                   |
+---+------------+--------------------+-----+------------------------+
|0  |sci.math    |Hello, Math!        |1.0  |[hello,, math!]         |
|1  |alt.religion|Hello, Religion!    |0.0  |[hello,, religion!]     |
|2  |sci.physics |Hello, Physics!     |1.0  |[hello,, physics!]      |
|3  |sci.math    |Hello, Math Revised!|1.0  |[hello,, math, revised!]|
|4  |sci.math    |Better Math         |1.0  |[better, math]          |
|5  |alt.religion|TGIF                |0.0  |[tgif]                  |
+---+------------+--------------------+-----+------------------------+



import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.ml.feature.RegexTokenizer
tokenizer: org.apache.spark.ml.feature.RegexTokenizer = RegexTokenizer: uid=regexTok_8f0b4eaac691, minTokenLength=1, gaps=true, pattern=\s+, toLowercase=true
tokenized: org.apache.spark.sql.DataFrame = [id: bigint, topic: string ... 3 more fields]


In [18]:
import org.apache.spark.ml.feature.HashingTF

val hashingTF = new HashingTF().setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")
    .setNumFeatures(5000)

val hashed = hashingTF.transform(tokenized)

hashed.show(false)

+---+------------+--------------------+-----+------------------------+------------------------------------+
|id |topic       |text                |label|words                   |features                            |
+---+------------+--------------------+-----+------------------------+------------------------------------+
|0  |sci.math    |Hello, Math!        |1.0  |[hello,, math!]         |(5000,[4391,4731],[1.0,1.0])        |
|1  |alt.religion|Hello, Religion!    |0.0  |[hello,, religion!]     |(5000,[2205,4731],[1.0,1.0])        |
|2  |sci.physics |Hello, Physics!     |1.0  |[hello,, physics!]      |(5000,[4731,4945],[1.0,1.0])        |
|3  |sci.math    |Hello, Math Revised!|1.0  |[hello,, math, revised!]|(5000,[869,2396,4731],[1.0,1.0,1.0])|
|4  |sci.math    |Better Math         |1.0  |[better, math]          |(5000,[2396,2543],[1.0,1.0])        |
|5  |alt.religion|TGIF                |0.0  |[tgif]                  |(5000,[987],[1.0])                  |
+---+------------+----------

import org.apache.spark.ml.feature.HashingTF
hashingTF: org.apache.spark.ml.feature.HashingTF = HashingTF: uid=hashingTF_9f0da6cbfb89, binary=false, numFeatures=5000
hashed: org.apache.spark.sql.DataFrame = [id: bigint, topic: string ... 4 more fields]


In [46]:
val Array(trainDF, testDF) = hashed.randomSplit(Array(0.7, 0.3))

trainDF.show

testDF.show

+---+------------+--------------------+-----+--------------------+--------------------+
| id|       topic|                text|label|               words|            features|
+---+------------+--------------------+-----+--------------------+--------------------+
|  1|alt.religion|    Hello, Religion!|  0.0| [hello,, religion!]|(5000,[2205,4731]...|
|  2| sci.physics|     Hello, Physics!|  1.0|  [hello,, physics!]|(5000,[4731,4945]...|
|  3|    sci.math|Hello, Math Revised!|  1.0|[hello,, math, re...|(5000,[869,2396,4...|
|  4|    sci.math|         Better Math|  1.0|      [better, math]|(5000,[2396,2543]...|
+---+------------+--------------------+-----+--------------------+--------------------+

+---+------------+------------+-----+---------------+--------------------+
| id|       topic|        text|label|          words|            features|
+---+------------+------------+-----+---------------+--------------------+
|  0|    sci.math|Hello, Math!|  1.0|[hello,, math!]|(5000,[4391,4731]

trainDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, topic: string ... 4 more fields]
testDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, topic: string ... 4 more fields]


In [47]:
import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression().setMaxIter(20).setRegParam(0.01)

val model = lr.fit(trainDF)

val pred = model.transform(testDF).select("topic", "label", "prediction")

pred.show

+------------+-----+----------+
|       topic|label|prediction|
+------------+-----+----------+
|    sci.math|  1.0|       1.0|
|alt.religion|  0.0|       1.0|
+------------+-----+----------+



import org.apache.spark.ml.classification.LogisticRegression
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_3d0ed9511c1a
model: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid=logreg_3d0ed9511c1a, numClasses=2, numFeatures=5000
pred: org.apache.spark.sql.DataFrame = [topic: string, label: double ... 1 more field]


# Text Processing Example - Pipeline

In [21]:
val Array(trainDF2, testDF2) = labelled.randomSplit(Array(0.8, 0.2))

trainDF2.show

testDF2.show

+---+------------+--------------------+-----+
| id|       topic|                text|label|
+---+------------+--------------------+-----+
|  0|    sci.math|        Hello, Math!|  1.0|
|  1|alt.religion|    Hello, Religion!|  0.0|
|  2| sci.physics|     Hello, Physics!|  1.0|
|  3|    sci.math|Hello, Math Revised!|  1.0|
|  4|    sci.math|         Better Math|  1.0|
+---+------------+--------------------+-----+

+---+------------+----+-----+
| id|       topic|text|label|
+---+------------+----+-----+
|  5|alt.religion|TGIF|  0.0|
+---+------------+----+-----+



trainDF2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, topic: string ... 2 more fields]
testDF2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, topic: string ... 2 more fields]


In [22]:
import org.apache.spark.ml.{Pipeline, PipelineModel}

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))

val model2 = pipeline.fit(trainDF2)

val pred = model2.transform(testDF2).select("topic", "label", "prediction")

pred.show

+------------+-----+----------+
|       topic|label|prediction|
+------------+-----+----------+
|alt.religion|  0.0|       1.0|
+------------+-----+----------+



import org.apache.spark.ml.{Pipeline, PipelineModel}
pipeline: org.apache.spark.ml.Pipeline = pipeline_35c833a4cf6f
model2: org.apache.spark.ml.PipelineModel = pipeline_35c833a4cf6f
pred: org.apache.spark.sql.DataFrame = [topic: string, label: double ... 1 more field]


# Feature Engineering - Vector Assembler

In [23]:
import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)

val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF

val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3")).setOutputCol("features")

va.transform(numsDF).show

+----+----+----+-------------+
|val1|val2|val3|     features|
+----+----+----+-------------+
|   1|   2|   3|[1.0,2.0,3.0]|
|   4|   5|   6|[4.0,5.0,6.0]|
|   7|   8|   9|[7.0,8.0,9.0]|
+----+----+----+-------------+



import org.apache.spark.ml.feature.VectorAssembler
defined class Nums
numsDF: org.apache.spark.sql.DataFrame = [val1: bigint, val2: bigint ... 1 more field]
va: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_240048322a6c, handleInvalid=error, numInputCols=3


# Feature Engineering - Continues Features

In [24]:
import org.apache.spark.ml.feature.Bucketizer

val contDF = spark.range(20).selectExpr("cast(id as double)")

val bucketBorders = Array(-1.0, 5.0, 10.0, 15.0, 20.0)
val bucketer = new Bucketizer().setSplits(bucketBorders).setInputCol("id")

bucketer.transform(contDF).show()

+----+-------------------------------+
|  id|bucketizer_8b0d6a960ea0__output|
+----+-------------------------------+
| 0.0|                            0.0|
| 1.0|                            0.0|
| 2.0|                            0.0|
| 3.0|                            0.0|
| 4.0|                            0.0|
| 5.0|                            1.0|
| 6.0|                            1.0|
| 7.0|                            1.0|
| 8.0|                            1.0|
| 9.0|                            1.0|
|10.0|                            2.0|
|11.0|                            2.0|
|12.0|                            2.0|
|13.0|                            2.0|
|14.0|                            2.0|
|15.0|                            3.0|
|16.0|                            3.0|
|17.0|                            3.0|
|18.0|                            3.0|
|19.0|                            3.0|
+----+-------------------------------+



import org.apache.spark.ml.feature.Bucketizer
contDF: org.apache.spark.sql.DataFrame = [id: double]
bucketBorders: Array[Double] = Array(-1.0, 5.0, 10.0, 15.0, 20.0)
bucketer: org.apache.spark.ml.feature.Bucketizer = Bucketizer: uid=bucketizer_8b0d6a960ea0


In [25]:
import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)

val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF

val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3")).setOutputCol("features")

val nums = va.transform(numsDF)

import org.apache.spark.ml.feature.VectorAssembler
defined class Nums
numsDF: org.apache.spark.sql.DataFrame = [val1: bigint, val2: bigint ... 1 more field]
va: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_ba14a68f2691, handleInvalid=error, numInputCols=3
nums: org.apache.spark.sql.DataFrame = [val1: bigint, val2: bigint ... 2 more fields]


In [48]:
import org.apache.spark.ml.feature.StandardScaler

val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaled")

scaler.fit(nums).transform(nums).show(false)

+----+----+----+-------------+-------------------------------------------+
|val1|val2|val3|features     |scaled                                     |
+----+----+----+-------------+-------------------------------------------+
|1   |2   |3   |[1.0,2.0,3.0]|[0.3333333333333333,0.6666666666666666,1.0]|
|4   |5   |6   |[4.0,5.0,6.0]|[1.3333333333333333,1.6666666666666665,2.0]|
|7   |8   |9   |[7.0,8.0,9.0]|[2.333333333333333,2.6666666666666665,3.0] |
+----+----+----+-------------+-------------------------------------------+



import org.apache.spark.ml.feature.StandardScaler
scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_0ea01b8e3677


In [27]:
import org.apache.spark.ml.feature.VectorAssembler

case class Nums(val1: Long, val2: Long, val3: Long)

val numsDF = spark.createDataFrame(Seq(Nums(1, 2, 3), Nums(4, 5, 6), Nums(7, 8, 9))).toDF

val va = new VectorAssembler().setInputCols(Array("val1", "val2", "val3")).setOutputCol("features")

val nums = va.transform(numsDF)

import org.apache.spark.ml.feature.VectorAssembler
defined class Nums
numsDF: org.apache.spark.sql.DataFrame = [val1: bigint, val2: bigint ... 1 more field]
va: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_7c4cbd56edad, handleInvalid=error, numInputCols=3
nums: org.apache.spark.sql.DataFrame = [val1: bigint, val2: bigint ... 2 more fields]


# Feature Engineering - Categorical Features

In [28]:
val simpleDF = spark.read.json("simple-ml.json")

simpleDF.show

+-----+----+------+------------------+
|color| lab|value1|            value2|
+-----+----+------+------------------+
|green|good|     1|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green|good|    15| 38.97187133755819|
|green|good|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     1| 38.97187133755819|
|  red| bad|     2|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|  red|good|    45| 38.97187133755819|
|green|good|     1|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green|good|    15| 38.97187133755819|
|green|good|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     1| 38.97187133755819|
|  red| bad|     2|14.386294994851129|
+-----+----+------+------------------+
only showing top 20 rows



simpleDF: org.apache.spark.sql.DataFrame = [color: string, lab: string ... 2 more fields]


In [29]:
import org.apache.spark.ml.feature.StringIndexer

val lblIndxr = new StringIndexer().setInputCol("lab").setOutputCol("labelInd")

val idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)

idxRes.show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|labelInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|     1.0|
|  red| bad|     1| 38.97187133755819|     0.0|
|  red| bad|     2|14.386294994851129|     0.0|
|  red| bad|    16|14.386294994851129|     0.0|
|  red|good|    45| 38.97187133755819|     1.0|
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|  

import org.apache.spark.ml.feature.StringIndexer
lblIndxr: org.apache.spark.ml.feature.StringIndexer = strIdx_3f0b4194a072
idxRes: org.apache.spark.sql.DataFrame = [color: string, lab: string ... 3 more fields]


In [30]:
import org.apache.spark.ml.feature.IndexToString

val labelReverse = new IndexToString().setInputCol("labelInd").setOutputCol("original")

labelReverse.transform(idxRes).show()

+-----+----+------+------------------+--------+--------+
|color| lab|value1|            value2|labelInd|original|
+-----+----+------+------------------+--------+--------+
|green|good|     1|14.386294994851129|     1.0|    good|
| blue| bad|     8|14.386294994851129|     0.0|     bad|
| blue| bad|    12|14.386294994851129|     0.0|     bad|
|green|good|    15| 38.97187133755819|     1.0|    good|
|green|good|    12|14.386294994851129|     1.0|    good|
|green| bad|    16|14.386294994851129|     0.0|     bad|
|  red|good|    35|14.386294994851129|     1.0|    good|
|  red| bad|     1| 38.97187133755819|     0.0|     bad|
|  red| bad|     2|14.386294994851129|     0.0|     bad|
|  red| bad|    16|14.386294994851129|     0.0|     bad|
|  red|good|    45| 38.97187133755819|     1.0|    good|
|green|good|     1|14.386294994851129|     1.0|    good|
| blue| bad|     8|14.386294994851129|     0.0|     bad|
| blue| bad|    12|14.386294994851129|     0.0|     bad|
|green|good|    15| 38.97187133

import org.apache.spark.ml.feature.IndexToString
labelReverse: org.apache.spark.ml.feature.IndexToString = idxToStr_3c67c6f00225


In [31]:
import org.apache.spark.ml.feature.OneHotEncoder

val lblIndxr = new StringIndexer().setInputCol("color").setOutputCol("colorInd")

val colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))

val encoder = new OneHotEncoder().setInputCol("colorInd").setOutputCol("one-hot")

val ohe = encoder.fit(colorLab)

val encoded = ohe.transform(colorLab.select("colorInd"))

encoded.show()

+--------+-------------+
|colorInd|      one-hot|
+--------+-------------+
|     1.0|(2,[1],[1.0])|
|     2.0|    (2,[],[])|
|     2.0|    (2,[],[])|
|     1.0|(2,[1],[1.0])|
|     1.0|(2,[1],[1.0])|
|     1.0|(2,[1],[1.0])|
|     0.0|(2,[0],[1.0])|
|     0.0|(2,[0],[1.0])|
|     0.0|(2,[0],[1.0])|
|     0.0|(2,[0],[1.0])|
|     0.0|(2,[0],[1.0])|
|     1.0|(2,[1],[1.0])|
|     2.0|    (2,[],[])|
|     2.0|    (2,[],[])|
|     1.0|(2,[1],[1.0])|
|     1.0|(2,[1],[1.0])|
|     1.0|(2,[1],[1.0])|
|     0.0|(2,[0],[1.0])|
|     0.0|(2,[0],[1.0])|
|     0.0|(2,[0],[1.0])|
+--------+-------------+
only showing top 20 rows



import org.apache.spark.ml.feature.OneHotEncoder
lblIndxr: org.apache.spark.ml.feature.StringIndexer = strIdx_3903bce5d40f
colorLab: org.apache.spark.sql.DataFrame = [color: string, colorInd: double]
encoder: org.apache.spark.ml.feature.OneHotEncoder = oneHotEncoder_8abc28fb989c
ohe: org.apache.spark.ml.feature.OneHotEncoderModel = OneHotEncoderModel: uid=oneHotEncoder_8abc28fb989c, dropLast=true, handleInvalid=error
encoded: org.apache.spark.sql.DataFrame = [colorInd: double, one-hot: vector]


# Feature Engineering - Text Features

In [32]:
val sales = spark.read.format("csv").option("header", "true").load("sales.csv").where("Description IS NOT NULL")

sales.show(5, false)

+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |17850.0   |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |17850.0   |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
+---------+-----

sales: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [InvoiceNo: string, StockCode: string ... 6 more fields]


In [33]:
import org.apache.spark.ml.feature.Tokenizer

val tkn = new Tokenizer().setInputCol("Description").setOutputCol("DescOut")

val tokenized = tkn.transform(sales.select("Description"))

tokenized.show(false)

+-----------------------------------+------------------------------------------+
|Description                        |DescOut                                   |
+-----------------------------------+------------------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER |[white, hanging, heart, t-light, holder]  |
|WHITE METAL LANTERN                |[white, metal, lantern]                   |
|CREAM CUPID HEARTS COAT HANGER     |[cream, cupid, hearts, coat, hanger]      |
|KNITTED UNION FLAG HOT WATER BOTTLE|[knitted, union, flag, hot, water, bottle]|
|RED WOOLLY HOTTIE WHITE HEART.     |[red, woolly, hottie, white, heart.]      |
|SET 7 BABUSHKA NESTING BOXES       |[set, 7, babushka, nesting, boxes]        |
|GLASS STAR FROSTED T-LIGHT HOLDER  |[glass, star, frosted, t-light, holder]   |
|HAND WARMER UNION JACK             |[hand, warmer, union, jack]               |
|HAND WARMER RED POLKA DOT          |[hand, warmer, red, polka, dot]           |
|ASSORTED COLOUR BIRD ORNAME

import org.apache.spark.ml.feature.Tokenizer
tkn: org.apache.spark.ml.feature.Tokenizer = tok_b416b20ffddb
tokenized: org.apache.spark.sql.DataFrame = [Description: string, DescOut: array<string>]


In [34]:
import org.apache.spark.ml.feature.StopWordsRemover

val df = spark.createDataFrame(Seq((0, Seq("I", "saw", "the", "red", "balloon")),
                                   (1, Seq("Mary", "had", "a", "little", "lamb")))).toDF("id", "raw")

val englishStopWords = StopWordsRemover.loadDefaultStopWords("english")

val stops = new StopWordsRemover().setStopWords(englishStopWords).setInputCol("raw").setOutputCol("WithoutStops")

stops.transform(df).show(false)

+---+----------------------------+--------------------+
|id |raw                         |WithoutStops        |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+



import org.apache.spark.ml.feature.StopWordsRemover
df: org.apache.spark.sql.DataFrame = [id: int, raw: array<string>]
englishStopWords: Array[String] = Array(i, me, my, myself, we, our, ours, ourselves, you, your, yours, yourself, yourselves, he, him, his, himself, she, her, hers, herself, it, its, itself, they, them, their, theirs, themselves, what, which, who, whom, this, that, these, those, am, is, are, was, were, be, been, being, have, has, had, having, do, does, did, doing, a, an, the, and, but, if, or, because, as, until, while, of, at, by, for, with, about, against, between, into, through, during, before, after, above, below, to, from, up, down, in, out, on, off, over, under, again, further, then, once, here, there, when, where, why, how, all, any, both, each, few, more, most, o...


In [35]:
import org.apache.spark.ml.feature.CountVectorizer

val df = spark.createDataFrame(Seq((0, Array("a", "b", "c")),
                                   (1, Array("a", "b", "b", "c", "a")))).toDF("id", "words")

val cvModel = new CountVectorizer().setInputCol("words").setOutputCol("features").setVocabSize(3).setMinDF(2)

val fittedCV = cvModel.fit(df)

fittedCV.transform(df).show(false)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



import org.apache.spark.ml.feature.CountVectorizer
df: org.apache.spark.sql.DataFrame = [id: int, words: array<string>]
cvModel: org.apache.spark.ml.feature.CountVectorizer = cntVec_42ce79b74a30
fittedCV: org.apache.spark.ml.feature.CountVectorizerModel = CountVectorizerModel: uid=cntVec_42ce79b74a30, vocabularySize=3


# Linear regression

In [36]:
val data = spark.read.format("libsvm").load("data.txt")

data: org.apache.spark.sql.DataFrame = [label: double, features: vector]


In [37]:
data.show(1, false)

+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label             |features                                                                                                                                                                                                                            |
+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|-9.490009878824548|(10,[0,1,2,3,4,5,6,7,8,9],[0.4551273600657362,0.36644694351969087,-0.38256108933468047,-0.4458430198517267,0.33109790358914726,0.8067445293443565,-0.2624341731773887,-0.44850386111659524,-0.07269284838169332,0.5658035575800715])|


In [38]:
import org.apache.spark.ml.regression.LinearRegression

val lr = new LinearRegression().setMaxIter(10)
val lrModel = lr.fit(data)

import org.apache.spark.ml.regression.LinearRegression
lr: org.apache.spark.ml.regression.LinearRegression = linReg_68939f959cee
lrModel: org.apache.spark.ml.regression.LinearRegressionModel = LinearRegressionModel: uid=linReg_68939f959cee, numFeatures=10


In [39]:
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
val trainingSummary = lrModel.summary
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

Coefficients: [0.0073350710225801715,0.8313757584337543,-0.8095307954684084,2.441191686884721,0.5191713795290003,1.1534591903547016,-0.2989124112808717,-0.5128514186201779,-0.619712827067017,0.6956151804322931] Intercept: 0.14228558260358093
RMSE: 10.16309157133015


trainingSummary: org.apache.spark.ml.regression.LinearRegressionTrainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@35e1924a


# Binary classification

In [40]:
case class cancer(x1: Long, y: Long)
val trainData = spark.createDataFrame(Seq(cancer(330, 1), cancer(120, 0), cancer(400, 1))).toDF
val testData = spark.createDataFrame(Seq(cancer(500, 0))).toDF

defined class cancer
trainData: org.apache.spark.sql.DataFrame = [x1: bigint, y: bigint]
testData: org.apache.spark.sql.DataFrame = [x1: bigint, y: bigint]


In [41]:
import org.apache.spark.ml.feature.VectorAssembler

val va = new VectorAssembler().setInputCols(Array("x1")).setOutputCol("features")
val train = va.transform(trainData)
val test = va.transform(testData)

import org.apache.spark.ml.feature.VectorAssembler
va: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_4e824d6fcfe2, handleInvalid=error, numInputCols=1
train: org.apache.spark.sql.DataFrame = [x1: bigint, y: bigint ... 1 more field]
test: org.apache.spark.sql.DataFrame = [x1: bigint, y: bigint ... 1 more field]


In [42]:
import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression().setFeaturesCol("features").setLabelCol("y")
.setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val lrModel = lr.fit(train)
lrModel.transform(test).show(false)

+---+---+--------+----------------------------------------+----------------------------------------+----------+
|x1 |y  |features|rawPrediction                           |probability                             |prediction|
+---+---+--------+----------------------------------------+----------------------------------------+----------+
|500|0  |[500.0] |[-1.6828468484311974,1.6828468484311974]|[0.15671886712181188,0.8432811328781882]|1.0       |
+---+---+--------+----------------------------------------+----------------------------------------+----------+



import org.apache.spark.ml.classification.LogisticRegression
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_f382c39e9fc8
lrModel: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid=logreg_f382c39e9fc8, numClasses=2, numFeatures=1


# Multi-class classification

In [43]:
val training = spark.read.format("libsvm").load("multiclass_data.txt")

import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val lrModel = lr.fit(training)

println(s"Coefficients: \n${lrModel.coefficientMatrix}")
println(s"Intercepts: \n${lrModel.interceptVector}")

Coefficients: 
3 x 4 CSCMatrix
(1,2) -0.7666333131801362
(0,3) 0.3049998979124714
(1,3) -0.38544484160713977
Intercepts: 
[0.051925800207288285,-0.12619173083598836,0.07426593062870007]


training: org.apache.spark.sql.DataFrame = [label: double, features: vector]
import org.apache.spark.ml.classification.LogisticRegression
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_bbb115a5c115
lrModel: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid=logreg_bbb115a5c115, numClasses=3, numFeatures=4


# The END