# ETL 2. Common Transformations

### Nomalizing Data

In [6]:
val integerDF = spark.range(1000, 10000)

integerDF.show(10)

+----+
|  id|
+----+
|1000|
|1001|
|1002|
|1003|
|1004|
|1005|
|1006|
|1007|
|1008|
|1009|
+----+
only showing top 10 rows



integerDF = [id: bigint]


[id: bigint]

In [8]:
import org.apache.spark.sql.functions.{col, max, min}

val colMin = integerDF.select(min("id")).first()(0).asInstanceOf[Long]
val colMax = integerDF.select(max("id")).first()(0).asInstanceOf[Long]

val normalizedIntegerDF = integerDF
  .withColumn("normalizedValue", (col("id") - colMin) / (colMax - colMin) )

normalizedIntegerDF.show(10)

+----+--------------------+
|  id|     normalizedValue|
+----+--------------------+
|1000|                 0.0|
|1001|1.111234581620180...|
|1002|2.222469163240360...|
|1003| 3.33370374486054E-4|
|1004|4.444938326480720...|
|1005|  5.5561729081009E-4|
|1006| 6.66740748972108E-4|
|1007| 7.77864207134126E-4|
|1008|8.889876652961441E-4|
|1009|0.001000111123458162|
+----+--------------------+
only showing top 10 rows



colMin = 1000
colMax = 9999
normalizedIntegerDF = [id: bigint, normalizedValue: double]


[id: bigint, normalizedValue: double]

### Imputing Null or Missing Data

In [11]:
val corruptDF = Seq(
  (Some(11), Some(66), Some(5)),
  (Some(12), Some(68), None),
  (Some(1), None, Some(6)),
  (Some(2), Some(72), Some(7))
).toDF("hour", "temperature", "wind")

corruptDF.show()

+----+-----------+----+
|hour|temperature|wind|
+----+-----------+----+
|  11|         66|   5|
|  12|         68|null|
|   1|       null|   6|
|   2|         72|   7|
+----+-----------+----+



corruptDF = [hour: int, temperature: int ... 1 more field]


[hour: int, temperature: int ... 1 more field]

In [12]:
// Drop any records that have null values.
val corruptDroppedDF = corruptDF.na.drop("any")

corruptDroppedDF.show()

+----+-----------+----+
|hour|temperature|wind|
+----+-----------+----+
|  11|         66|   5|
|   2|         72|   7|
+----+-----------+----+



corruptDroppedDF = [hour: int, temperature: int ... 1 more field]


[hour: int, temperature: int ... 1 more field]

In [14]:
// Impute values with the mean
val map = Map("temperature" -> 68, "wind" -> 6)
val corruptImputedDF = corruptDF.na.fill(map)

corruptImputedDF.show()

+----+-----------+----+
|hour|temperature|wind|
+----+-----------+----+
|  11|         66|   5|
|  12|         68|   6|
|   1|         68|   6|
|   2|         72|   7|
+----+-----------+----+



map = Map(temperature -> 68, wind -> 6)
corruptImputedDF = [hour: int, temperature: int ... 1 more field]


[hour: int, temperature: int ... 1 more field]

### Deduplicating Data

In [15]:
val duplicateDF = Seq(
  (15342, "Conor", "red"),
  (15342, "conor", "red"),
  (12512, "Dorothy", "blue"),
  (5234, "Doug", "aqua")
  ).toDF("id", "name", "favorite_color")

duplicateDF.show()

+-----+-------+--------------+
|   id|   name|favorite_color|
+-----+-------+--------------+
|15342|  Conor|           red|
|15342|  conor|           red|
|12512|Dorothy|          blue|
| 5234|   Doug|          aqua|
+-----+-------+--------------+



duplicateDF = [id: int, name: string ... 1 more field]


[id: int, name: string ... 1 more field]

In [17]:
val duplicateDedupedDF = duplicateDF.dropDuplicates("id", "favorite_color")

duplicateDedupedDF.show()

+-----+-------+--------------+
|   id|   name|favorite_color|
+-----+-------+--------------+
|15342|  Conor|           red|
| 5234|   Doug|          aqua|
|12512|Dorothy|          blue|
+-----+-------+--------------+



duplicateDedupedDF = [id: int, name: string ... 1 more field]


[id: int, name: string ... 1 more field]

### Other Helpful Data Manipulation Functions

| Function    | Use                                                                                                                        |
|:------------|:---------------------------------------------------------------------------------------------------------------------------|
| `explode()` | Returns a new row for each element in the given array or map                                                               |
| `pivot()`   | Pivots a column of the current DataFrame and perform the specified aggregation                                             |
| `cube()`    | Create a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregation on them   |
| `rollup()`  | Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them |