# Preamble

In [6]:
import $ivy.`org.apache.spark::spark-sql:2.4.5` 
import $ivy.`sh.almond::almond-spark:0.4.0`

//import org.apache.spark._
import org.apache.spark.sql.{NotebookSparkSession, SparkSession}
import org.apache.spark.sql.{functions => func, _}
import org.apache.spark.sql.types._

val spark = NotebookSparkSession
      .builder()
      .config("spark.sql.join.preferSortMergeJoin", false)
      .config("spark.sql.shuffle.partitions", 64)
      .master("local[*]")
      .getOrCreate()

import spark.implicits._

import org.slf4j.LoggerFactory
import org.apache.log4j.{Level, Logger}

Logger.getRootLogger().setLevel(Level.ERROR)

def run[A](code: => A): A = {
    val start = System.currentTimeMillis()
    val res = code
    println(s"Took ${System.currentTimeMillis() - start}")
    res
}

Creating SparkSession


[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                              

//import org.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql.{NotebookSparkSession, SparkSession}
[39m
[32mimport [39m[36morg.apache.spark.sql.{functions => func, _}
[39m
[32mimport [39m[36morg.apache.spark.sql.types._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@418d21b0
[32mimport [39m[36mspark.implicits._

[39m
[32mimport [39m[36morg.slf4j.LoggerFactory
[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}

[39m
defined [32mfunction[39m [36mrun[39m

# On `DataFrame`s

We can create datasets from external data sources using different formats, e.g. Json, parquet, CSV, etc. 

###### Para leer datos en otros formatos usamos `.read.formato("fichero.formato")`
###### Con multiline permitimos que los datos JSON se encuentren en diferentes lineas

In [7]:
val data: DataFrame = spark.read.option("multiline", "true").json("D:/TFGAlvaroSanchez/json_data/2916A(Vitigudino)-2018.json")

[36mdata[39m: [32mDataFrame[39m = [e: string, fecha: string ... 28 more fields]

Note that we created a `DataFrame`, not a `Dataset`. Dataframes are like datasets, i.e. programs to generate distributed data sets, but *dynamically typed*. This means that, at compile time, Scala only knows that a dataframe consists of `Row`s.

In [8]:
data.collect

[36mres7[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [77.0,2018-1,82.0,2916A,4.0,14.0,8.0,2.0,0.0,13.0,0.0,2.0,0.0,12.4(13),53.0,41.0,1024.9,944.8(28),933.8,911.9(07),16.2(03),-2.8(15),4.2,10.3,5.7,1.0,10.1,12.0,23/18.3(05),281.0],
  [59.0,2018-2,68.0,2916A,5.7,8.0,6.0,2.0,1.0,21.0,0.0,4.0,0.0,30.2(27),52.0,53.0,1016.7,936.4(17),926.0,899.7(28),17.5(26),-5.2(24),3.1,10.3,4.6,-1.1,7.8,13.0,26/19.2(13),293.0],
  [77.0,2018-3,77.0,2916A,4.1,22.0,20.0,4.0,1.0,6.0,0.0,16.0,0.0,32.2(09),172.6,34.0,1008.7,938.4(27),919.4,891.4(01),19.5(27),-3.0(22),7.8,11.4,6.7,1.9,8.9,20.0,26/23.9(10),452.0],
  [95.0,2018-4,71.0,2916A,6.0,17.0,16.0,4.0,0.0,1.0,0.0,null,null,15.0(12),102.4,45.0,1011.9,935.2(17),923.8,907.1(10),26.9(25),-0.3(11),8.6,16.9,11.2,5.4,11.6,null,null,null],
  [107.0,2018-5,61.0,2916A,7.8,12.0,8.0,3.0,0.0,1.0,0.0,null,null,26.0(24),76.8,54.0,1014.4,934.9(13),927.3,922.4(21),26.9(08),0.0(13),15.9,21.0,14.2,7.4,12.2,null,null,null],
  [139.0,2018-6,62.0,2916A,8.4,11.0,9.

In fact, a `DataFrame` is defined as an alias of `Dataset`: 

In [9]:
val dataDS: Dataset[Row] = data

[36mdataDS[39m: [32mDataset[39m[[32mRow[39m] = [e: string, fecha: string ... 28 more fields]

But the type of the information to be processed is there! 

In [10]:
data.schema
data.printSchema

root
 |-- e: string (nullable = true)
 |-- fecha: string (nullable = true)
 |-- hr: string (nullable = true)
 |-- indicativo: string (nullable = true)
 |-- inso: string (nullable = true)
 |-- np_001: string (nullable = true)
 |-- np_010: string (nullable = true)
 |-- np_100: string (nullable = true)
 |-- np_300: string (nullable = true)
 |-- nt_00: string (nullable = true)
 |-- nt_30: string (nullable = true)
 |-- nw_55: string (nullable = true)
 |-- nw_91: string (nullable = true)
 |-- p_max: string (nullable = true)
 |-- p_mes: string (nullable = true)
 |-- p_sol: string (nullable = true)
 |-- q_mar: string (nullable = true)
 |-- q_max: string (nullable = true)
 |-- q_med: string (nullable = true)
 |-- q_min: string (nullable = true)
 |-- ta_max: string (nullable = true)
 |-- ta_min: string (nullable = true)
 |-- ti_max: string (nullable = true)
 |-- tm_max: string (nullable = true)
 |-- tm_mes: string (nullable = true)
 |-- tm_min: string (nullable = true)
 |-- ts_min: string (nulla

[36mres9_0[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"e"[39m, StringType, true, {}),
  [33mStructField[39m([32m"fecha"[39m, StringType, true, {}),
  [33mStructField[39m([32m"hr"[39m, StringType, true, {}),
  [33mStructField[39m([32m"indicativo"[39m, StringType, true, {}),
  [33mStructField[39m([32m"inso"[39m, StringType, true, {}),
  [33mStructField[39m([32m"np_001"[39m, StringType, true, {}),
  [33mStructField[39m([32m"np_010"[39m, StringType, true, {}),
  [33mStructField[39m([32m"np_100"[39m, StringType, true, {}),
  [33mStructField[39m([32m"np_300"[39m, StringType, true, {}),
  [33mStructField[39m([32m"nt_00"[39m, StringType, true, {}),
  [33mStructField[39m([32m"nt_30"[39m, StringType, true, {}),
  [33mStructField[39m([32m"nw_55"[39m, StringType, true, {}),
  [33mStructField[39m([32m"nw_91"[39m, StringType, true, {}),
  [33mStructField[39m([32m"p_max"[39m, StringType, true, {}),
  [33mStruc

and we can convert a dataframe into a dataset: 

###### Con `.as[class]` transformamos el DataFrame en DataSet

In [11]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

case class Data(fecha: String, indicativo: String, p_max: String, hr: String, inso: String, q_max: String,
                nw_55: String, q_mar: String, q_med: String, tm_min: String, ta_max : String, 
               ts_min : String ,nt_30: String, w_racha: String, np_100: String, p_sol: String, nw_91: String, np_001: String,
               ta_min: String, w_rec: String, e: String, np_300: String, p_mes: String, w_med: String, 
               nt_00: String, ti_max: String, tm_mes: String, tm_max: String, q_min: String, np_010: String)

val dataDs: Dataset[Data] = data.as[Data]

defined [32mclass[39m [36mData[39m
[36mdataDs[39m: [32mDataset[39m[[32mData[39m] = [e: string, fecha: string ... 28 more fields]

In [12]:
dataDs.show
data.show

+-----+-------+----+----------+----+------+------+------+------+-----+-----+-----+-----+------------+-----+-----+------+-------------+-----+-------------+------------+------------+------+------+------+------+------+-----+-----------+-----+
|    e|  fecha|  hr|indicativo|inso|np_001|np_010|np_100|np_300|nt_00|nt_30|nw_55|nw_91|       p_max|p_mes|p_sol| q_mar|        q_max|q_med|        q_min|      ta_max|      ta_min|ti_max|tm_max|tm_mes|tm_min|ts_min|w_med|    w_racha|w_rec|
+-----+-------+----+----------+----+------+------+------+------+-----+-----+-----+-----+------------+-----+-----+------+-------------+-----+-------------+------------+------------+------+------+------+------+------+-----+-----------+-----+
| 77.0| 2018-1|82.0|     2916A| 4.0|  14.0|   8.0|   2.0|   0.0| 13.0|  0.0|  2.0|  0.0|    12.4(13)| 53.0| 41.0|1024.9|    944.8(28)|933.8|    911.9(07)|    16.2(03)|    -2.8(15)|   4.2|  10.3|   5.7|   1.0|  10.1| 12.0|23/18.3(05)|281.0|
| 59.0| 2018-2|68.0|     2916A| 5.7|   8

+-----+-------+----+----------+----+------+------+------+------+-----+-----+-----+-----+------------+-----+-----+------+-------------+-----+-------------+------------+------------+------+------+------+------+------+-----+-----------+-----+
|    e|  fecha|  hr|indicativo|inso|np_001|np_010|np_100|np_300|nt_00|nt_30|nw_55|nw_91|       p_max|p_mes|p_sol| q_mar|        q_max|q_med|        q_min|      ta_max|      ta_min|ti_max|tm_max|tm_mes|tm_min|ts_min|w_med|    w_racha|w_rec|
+-----+-------+----+----------+----+------+------+------+------+-----+-----+-----+-----+------------+-----+-----+------+-------------+-----+-------------+------------+------------+------+------+------+------+------+-----+-----------+-----+
| 77.0| 2018-1|82.0|     2916A| 4.0|  14.0|   8.0|   2.0|   0.0| 13.0|  0.0|  2.0|  0.0|    12.4(13)| 53.0| 41.0|1024.9|    944.8(28)|933.8|    911.9(07)|    16.2(03)|    -2.8(15)|   4.2|  10.3|   5.7|   1.0|  10.1| 12.0|23/18.3(05)|281.0|
| 59.0| 2018-2|68.0|     2916A| 5.7|   8

# Untyped transformations

The `Dataset` API includes a section on _untyped transformations_. These are transformations that are not defined over the Scala types but over the inner Spark SQL types (i.e. `StructType`s). More exactly, these could be named *dynamically typed transformations*.

These transformations are in close corresponde with their SQL counterparts: `SELECT`, `WHERE`, `GROUP BY`, `FROM`, etc. 

### The `select` transformation

For instance, the equivalent to the `map` typed transformation is `select`: 

In [13]:
val ds: Dataset[String] = dataDs.map(_.ta_max)
ds.collect
ds.show
ds.explain

+------------+
|       value|
+------------+
|    16.2(03)|
|    17.5(26)|
|    19.5(27)|
|    26.9(25)|
|    26.9(08)|
|    34.1(24)|
|    33.8(10)|
|    40.7(06)|
|    36.9(02)|
|    30.3(05)|
|    19.6(02)|
|    18.3(09)|
|40.7(06/ago)|
+------------+

== Physical Plan ==
*(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#367]
+- *(1) MapElements ammonite.$sess.cmd12$Helper$$Lambda$4627/0x00000008017c8840@295ccee8, obj#366: java.lang.String
   +- *(1) DeserializeToObject newInstance(class ammonite.$sess.cmd10$Helper$Data), obj#365: ammonite.$sess.cmd10$Helper$Data
      +- *(1) FileScan json [e#0,fecha#1,hr#2,indicativo#3,inso#4,np_001#5,np_010#6,np_100#7,np_300#8,nt_00#9,nt_30#10,nw_55#11,nw_91#12,p_max#13,p_mes#14,p_sol#15,q_mar#16,q_max#17,q_med#18,q_min#19,ta_max#20,ta_min#21,ti_max#22,tm_max#23,... 6 more fields] Batched: false, Format: JSON, Location: InMemoryFil

[36mds[39m: [32mDataset[39m[[32mString[39m] = [value: string]
[36mres12_1[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"16.2(03)"[39m,
  [32m"17.5(26)"[39m,
  [32m"19.5(27)"[39m,
  [32m"26.9(25)"[39m,
  [32m"26.9(08)"[39m,
  [32m"34.1(24)"[39m,
  [32m"33.8(10)"[39m,
  [32m"40.7(06)"[39m,
  [32m"36.9(02)"[39m,
  [32m"30.3(05)"[39m,
  [32m"19.6(02)"[39m,
  [32m"18.3(09)"[39m,
  [32m"40.7(06/ago)"[39m
)

In [14]:
val df: DataFrame = 
    spark.read.option("multiline", "true").json("D:/TFGAlvaroSanchez/json_data/2916A(Vitigudino)-2018.json").select($"ta_max")
df.collect
df.show
df.schema

+------------+
|      ta_max|
+------------+
|    16.2(03)|
|    17.5(26)|
|    19.5(27)|
|    26.9(25)|
|    26.9(08)|
|    34.1(24)|
|    33.8(10)|
|    40.7(06)|
|    36.9(02)|
|    30.3(05)|
|    19.6(02)|
|    18.3(09)|
|40.7(06/ago)|
+------------+



[36mdf[39m: [32mDataFrame[39m = [ta_max: string]
[36mres13_1[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [16.2(03)],
  [17.5(26)],
  [19.5(27)],
  [26.9(25)],
  [26.9(08)],
  [34.1(24)],
  [33.8(10)],
  [40.7(06)],
  [36.9(02)],
  [30.3(05)],
  [19.6(02)],
  [18.3(09)],
  [40.7(06/ago)]
)
[36mres13_3[39m: [32mStructType[39m = [33mStructType[39m([33mStructField[39m([32m"ta_max"[39m, StringType, true, {}))

###### Tenga en cuenta que perdimos la etiqueta de la columna (ta_max) en el caso de la transformación del conjunto de datos (DataSet). Esto no está sucediendo con select (DataFrame). Además, tenemos más control sobre el esquema resultante:


###### `substring()` -> `substring(inicio, fin)`
###### `substr()` -> `substr(inicio, longitud)`  

In [15]:
dataDs.schema
dataDS.map(t => (t(1).toString, t(20).toString.substring(0,4), t(1).toString.substring(5,6)))
    .show

+-------+----+---+
|     _1|  _2| _3|
+-------+----+---+
| 2018-1|16.2|  1|
| 2018-2|17.5|  2|
| 2018-3|19.5|  3|
| 2018-4|26.9|  4|
| 2018-5|26.9|  5|
| 2018-6|34.1|  6|
| 2018-7|33.8|  7|
| 2018-8|40.7|  8|
| 2018-9|36.9|  9|
|2018-10|30.3|  1|
|2018-11|19.6|  1|
|2018-12|18.3|  1|
|2018-13|40.7|  1|
+-------+----+---+



[36mres14_0[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"e"[39m, StringType, true, {}),
  [33mStructField[39m([32m"fecha"[39m, StringType, true, {}),
  [33mStructField[39m([32m"hr"[39m, StringType, true, {}),
  [33mStructField[39m([32m"indicativo"[39m, StringType, true, {}),
  [33mStructField[39m([32m"inso"[39m, StringType, true, {}),
  [33mStructField[39m([32m"np_001"[39m, StringType, true, {}),
  [33mStructField[39m([32m"np_010"[39m, StringType, true, {}),
  [33mStructField[39m([32m"np_100"[39m, StringType, true, {}),
  [33mStructField[39m([32m"np_300"[39m, StringType, true, {}),
  [33mStructField[39m([32m"nt_00"[39m, StringType, true, {}),
  [33mStructField[39m([32m"nt_30"[39m, StringType, true, {}),
  [33mStructField[39m([32m"nw_55"[39m, StringType, true, {}),
  [33mStructField[39m([32m"nw_91"[39m, StringType, true, {}),
  [33mStructField[39m([32m"p_max"[39m, StringType, true, {}),
  [33mStru

In [16]:
data.select($"fecha", $"ta_max".substr(0,4) as "temperatura max", $"fecha".substr(6,1) as "mes")
    .show

+-------+---------------+---+
|  fecha|temperatura max|mes|
+-------+---------------+---+
| 2018-1|           16.2|  1|
| 2018-2|           17.5|  2|
| 2018-3|           19.5|  3|
| 2018-4|           26.9|  4|
| 2018-5|           26.9|  5|
| 2018-6|           34.1|  6|
| 2018-7|           33.8|  7|
| 2018-8|           40.7|  8|
| 2018-9|           36.9|  9|
|2018-10|           30.3|  1|
|2018-11|           19.6|  1|
|2018-12|           18.3|  1|
|2018-13|           40.7|  1|
+-------+---------------+---+



The [org.apache.spark.sql.functions](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$) contains dozens of column operators.

Note that _untyped_, or more properly, _dynamically typed_, character means that the Scala compiler won't complain if we choose a non-existent column:

###### El compilador de Scala no se quejará si elegimos una columna inexistente, señala el error en tiempo de ejecucion

In [17]:
lazy val df: DataFrame = spark.read.option("multiline", "true").json("D:/TFGAlvaroSanchez/json_data/2916A(Vitigudino)-2018.json").select($"nam")

[36mdf[39m: [32mDataFrame[39m = [32m<lazy>[39m

The error will be shown at runtime: 

In [18]:
df

: 

On the contrary, the error in the dataset transformation manifests at compile-time:

###### En el DataSet nos muestra el error en tiempo de ejecucion

In [18]:
dataDs.map(_.nam)

cmd18.sc:1: value nam is not a member of cmd18.this.cmd10.Data
val res18 = dataDs.map(_.nam)
                         ^Compilation Failed

: 

### The `filter` transformation

This is the equivalent to the typed `filter` transformation:

###### En este caso primero hemos tenido que transformar la columna ta_max en tipo integer. Lo realizamos mediante `.withColumn("nuevoNombreColumna", "nombreColumna".cast(tipo))`

In [19]:
val data1 = data.withColumn("ta_max", $"ta_max".substr(0,4).cast(IntegerType))
data1.filter($"ta_max" > 30)
    .show

+-----+-------+----+----------+----+------+------+------+------+-----+-----+-----+-----+------------+-----+-----+------+-------------+-----+-------------+------+------------+------+------+------+------+------+-----+-------+-----+
|    e|  fecha|  hr|indicativo|inso|np_001|np_010|np_100|np_300|nt_00|nt_30|nw_55|nw_91|       p_max|p_mes|p_sol| q_mar|        q_max|q_med|        q_min|ta_max|      ta_min|ti_max|tm_max|tm_mes|tm_min|ts_min|w_med|w_racha|w_rec|
+-----+-------+----+----------+----+------+------+------+------+-----+-----+-----+-----+------------+-----+-----+------+-------------+-----+-------------+------+------------+------+------+------+------+------+-----+-------+-----+
|139.0| 2018-6|62.0|     2916A| 8.4|  11.0|   9.0|   0.0|   0.0|  0.0|  8.0| null| null|     9.6(08)| 49.0| 56.0|1013.9|    935.9(18)|928.2|    919.8(30)|    34|     7.1(02)|  15.3|  25.2|  18.6|  11.9|  17.3| 11.0|   null| null|
|129.0| 2018-7|50.0|     2916A|11.0|   5.0|   2.0|   1.0|   0.0|  0.0| 16.0| nul

[36mdata1[39m: [32mDataFrame[39m = [e: string, fecha: string ... 28 more fields]

If we pass a column function not denoting a boolean value, we won't even get a run-time exception:

In [20]:
def df: DataFrame = 
    data.filter($"fecha" > 2001)

defined [32mfunction[39m [36mdf[39m

In [21]:
df.show

+---+-----+---+----------+----+------+------+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+-----+-------+-----+
|  e|fecha| hr|indicativo|inso|np_001|np_010|np_100|np_300|nt_00|nt_30|nw_55|nw_91|p_max|p_mes|p_sol|q_mar|q_max|q_med|q_min|ta_max|ta_min|ti_max|tm_max|tm_mes|tm_min|ts_min|w_med|w_racha|w_rec|
+---+-----+---+----------+----+------+------+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+-----+-------+-----+
+---+-----+---+----------+----+------+------+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+-----+-------+-----+



### The `groupBy` transformation

###### Agrupamos los datos mediante la columna que le indiquemos. Si tiene el mismo valor en esta columna se agrupan, mediante `count` realizamos un recuento de cuantos se han agrupado

In [22]:
val students: DataFrame = spark.read.json("D:/GitHub/spark-intro/data/students.json")

[36mstudents[39m: [32mDataFrame[39m = [degree: string, name: string]

In [23]:
students.groupBy($"degree").count.show

+------+-----+
|degree|count|
+------+-----+
|   MAT|    2|
|    II|    4|
|    IS|    1|
+------+-----+



### `Join` transformations

We already discussed joins, but we didn't mention that the resulting type of a join is a dataframe, not a dataset: 

###### El tipo resultante de una unión es un DataFrame, no un DataSet

In [24]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

case class Student(name: String, degree: String)
case class Person(name: String, age: Long)

val people : DataFrame = spark.read.json("D:/GitHub/spark-intro/data/people.json") 
val peopleDs: Dataset[Person] = people.as[Person]

defined [32mclass[39m [36mStudent[39m
defined [32mclass[39m [36mPerson[39m
[36mpeople[39m: [32mDataFrame[39m = [age: bigint, name: string]
[36mpeopleDs[39m: [32mDataset[39m[[32mPerson[39m] = [age: bigint, name: string]

In [25]:
peopleDs.join(students.as[Student], "name")

[36mres24[39m: [32mDataFrame[39m = [name: string, age: bigint ... 1 more field]

###### Podemos observar que cambia el tipo de dato de la variable peopleDs

# The problems of `Dataset`s

Datasets are nice because they are type safe, but, unfortunately, they are less efficient than data frames in several respects. This can be best shown by reading from parquet source files. 

###### Los DataSets son buenos porque son de tipo seguro, pero, desafortunadamente, son menos eficientes que los DataFrames.

Parquet is a _columnar_ format, which means that it stores physically data around columns, allowing us to read only data from a particular column without reading the entire row.

###### Parquet es un formato de columnas, lo que significa que almacena datos físicos alrededor de las columnas, lo que nos permite leer solo los datos de una columna en particular sin leer toda la fila.

In [26]:
people.write.mode("overwrite").parquet("D:/GitHub/spark-intro/data/people.parquet")

In [27]:
spark.read.parquet("D:/GitHub/spark-intro/data/people.parquet").schema

[36mres26[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"age"[39m, LongType, true, {}),
  [33mStructField[39m([32m"name"[39m, StringType, true, {})
)

### The `ReadSchema` optimization

Let's create a program that simply read the _name_ column of the people dataset:

In [28]:
val ds: Dataset[String] = 
    spark.read.parquet("D:/GitHub/spark-intro/data/people.parquet").as[Person]
        .map(_.name)

[36mds[39m: [32mDataset[39m[[32mString[39m] = [value: string]

which works as intended: 

In [29]:
ds.show

+-------+
|  value|
+-------+
|  YiHui|
| Javier|
|Gabriel|
| Noelia|
+-------+



We have a problem, however: 

In [30]:
ds.explain

== Physical Plan ==
*(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#964]
+- *(1) MapElements ammonite.$sess.cmd27$Helper$$Lambda$5320/0x0000000801adc840@35946ca3, obj#963: java.lang.String
   +- *(1) DeserializeToObject newInstance(class ammonite.$sess.cmd23$Helper$Person), obj#962: ammonite.$sess.cmd23$Helper$Person
      +- *(1) FileScan parquet [age#954L,name#955] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/D:/GitHub/spark-intro/data/people.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<age:bigint,name:string>


As we can see, the plan includes the directive `ReadSchema: struct<age:bigint,name:string>`, which generates a query to scan the full schema of the parquet file. But we just want to read the names! We can create an optimun program using dataframes:

In [31]:
val df: DataFrame = 
    spark.read.parquet("D:/GitHub/spark-intro/data/people.parquet").select($"name")

[36mdf[39m: [32mDataFrame[39m = [name: string]

which works similarly: 

In [32]:
df.show

+-------+
|   name|
+-------+
|  YiHui|
| Javier|
|Gabriel|
| Noelia|
+-------+



but more efficiently (note the the value of the `ReadSchema` directive):

In [33]:
df.explain

== Physical Plan ==
*(1) FileScan parquet [name#972] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/D:/GitHub/spark-intro/data/people.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string>


We can empirically check that it actually works using the Spark UI. First, we create a parquet file with enough rows and several columns:

In [44]:
import org.apache.spark.sql.functions.{lit, rand, round}
spark.range(0, 1000000)
    .select($"id" as "_1", lit(1) as "_2")
    .write.mode("overwrite").parquet("D:/GitHub/spark-intro/data/test")

[32mimport [39m[36morg.apache.spark.sql.functions.{lit, rand, round}
[39m

Now, we read the second column using both datasets and dataframes, and check the Spark UI for the _Input Size_ field.

In [37]:
val test = spark.read.parquet("D:/GitHub/spark-intro/data/test")
test.as[Tuple2[Long, Int]].map(_._2).collect

[36mtest[39m: [32mDataFrame[39m = [_1: bigint, _2: int]
[36mres36_1[39m: [32mArray[39m[[32mInt[39m] = [33mArray[39m(
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
  [32m1[39m,
...

Using dataframes the input size is much lower since we only read the second column:

In [38]:
test.select($"_2").collect

[36mres37[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
  [1],
...

### The `PushedFilter` optimization

Let's consider the following equivalent dataset and dataframe programs: 

In [39]:
val ds: Dataset[(Long, Int)] = 
    test.as[(Long, Int)]
        .filter(_._1 >= 999995)

val df: DataFrame = 
    test
        .filter($"_1" >= 999995)

[36mds[39m: [32mDataset[39m[([32mLong[39m, [32mInt[39m)] = [_1: bigint, _2: int]
[36mdf[39m: [32mDataFrame[39m = [_1: bigint, _2: int]

Functionally, they are equivalent, but their performance differ significantly:

In [40]:
df.collect
ds.collect

[36mres39_0[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [999995,1],
  [999996,1],
  [999997,1],
  [999998,1],
  [999999,1]
)
[36mres39_1[39m: [32mArray[39m[([32mLong[39m, [32mInt[39m)] = [33mArray[39m(
  ([32m999995L[39m, [32m1[39m),
  ([32m999996L[39m, [32m1[39m),
  ([32m999997L[39m, [32m1[39m),
  ([32m999998L[39m, [32m1[39m),
  ([32m999999L[39m, [32m1[39m)
)

The explanation of this difference lies in another optimization applied by the Spark SQL compiler: the so-called push-down filter optimization. In the previous `ReadSchema` optimization, we skipped certain columns of the dataset; now, we skip rows and read only the ones we are interested in (those that satisfy the predicate). We can check if the push-down filter optimization is actually applied by inspecting the query plan. 

In [41]:
df.explain
ds.explain

== Physical Plan ==
*(1) Project [_1#1005L, _2#1006]
+- *(1) Filter (isnotnull(_1#1005L) && (_1#1005L >= 999995))
   +- *(1) FileScan parquet [_1#1005L,_2#1006] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/D:/GitHub/spark-intro/data/test], PartitionFilters: [], PushedFilters: [IsNotNull(_1), GreaterThanOrEqual(_1,999995)], ReadSchema: struct<_1:bigint,_2:int>
== Physical Plan ==
*(1) Filter ammonite.$sess.cmd38$Helper$$Lambda$5574/0x0000000801b78840@11ff4aa1.apply
+- *(1) FileScan parquet [_1#1005L,_2#1006] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/D:/GitHub/spark-intro/data/test], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:bigint,_2:int>


### The `PartitionFilters` optimization

Let's create a test file with an additional column: 

In [45]:
spark.range(0, 1000000)
    .select($"id" as "_1", lit(1) as "_2", round(rand() * 10) mod lit(10) as "_3")
    .write.mode("overwrite").parquet("D:/GitHub/spark-intro/data/test")

In [46]:
val test: DataFrame = spark.read.parquet("D:/GitHub/spark-intro/data/test")

[36mtest[39m: [32mDataFrame[39m = [_1: bigint, _2: int ... 1 more field]

In [47]:
test.show

+------+---+---+
|    _1| _2| _3|
+------+---+---+
|500000|  1|5.0|
|500001|  1|2.0|
|500002|  1|6.0|
|500003|  1|3.0|
|500004|  1|1.0|
|500005|  1|4.0|
|500006|  1|8.0|
|500007|  1|4.0|
|500008|  1|4.0|
|500009|  1|0.0|
|500010|  1|7.0|
|500011|  1|1.0|
|500012|  1|6.0|
|500013|  1|6.0|
|500014|  1|6.0|
|500015|  1|0.0|
|500016|  1|6.0|
|500017|  1|2.0|
|500018|  1|2.0|
|500019|  1|2.0|
+------+---+---+
only showing top 20 rows



Let's suppose that we want to read data with value `_3` equal to `9.0`:

In [48]:
test.filter($"_3" === lit(9.0)).show

+------+---+---+
|    _1| _2| _3|
+------+---+---+
|500042|  1|9.0|
|500056|  1|9.0|
|500071|  1|9.0|
|500081|  1|9.0|
|500082|  1|9.0|
|500088|  1|9.0|
|500090|  1|9.0|
|500095|  1|9.0|
|500098|  1|9.0|
|500099|  1|9.0|
|500111|  1|9.0|
|500117|  1|9.0|
|500121|  1|9.0|
|500131|  1|9.0|
|500140|  1|9.0|
|500144|  1|9.0|
|500155|  1|9.0|
|500156|  1|9.0|
|500170|  1|9.0|
|500171|  1|9.0|
+------+---+---+
only showing top 20 rows



A pushed filter optimization is created, but it would be better if we could just read directly those rows with the exact value for the thrid column. We can achieve that as follows:

In [50]:
test.write.mode("overwrite").partitionBy("_3").parquet("D:/GitHub/spark-intro/data/test/testP")

As we can see, the parquet file is splitted into ten partitions. Now, if we just want to process data with a particular key, Spark will generate an optimun query: 

In [52]:
val testP: DataFrame = spark.read.parquet("D:/GitHub/spark-intro/data/test/testP")

[36mtestP[39m: [32mDataFrame[39m = [_1: bigint, _2: int ... 1 more field]

In [53]:
testP.filter($"_3" === lit(9.0)).show

+------+---+---+
|    _1| _2| _3|
+------+---+---+
|125006|  1|9.0|
|125007|  1|9.0|
|125010|  1|9.0|
|125020|  1|9.0|
|125025|  1|9.0|
|125028|  1|9.0|
|125029|  1|9.0|
|125050|  1|9.0|
|125051|  1|9.0|
|125062|  1|9.0|
|125064|  1|9.0|
|125083|  1|9.0|
|125086|  1|9.0|
|125091|  1|9.0|
|125107|  1|9.0|
|125116|  1|9.0|
|125137|  1|9.0|
|125150|  1|9.0|
|125159|  1|9.0|
|125160|  1|9.0|
+------+---+---+
only showing top 20 rows



We can inspet the Spark UI to check that we read less data in the last action.