# ID2221 HT21 Lab2

## Agenda:

We will go through the following:
- ''Lab2_01_Structured_Data_Processing.ipynb''
- ''Lab2_02_SparkSQL_DataFrame.ipynb''
- ''Lab2_03_SparkSQL_Dataset.html''

In the last lab session, we looked at how to set up Spark environment in a native environment (Your local machine or a virual machine).

In this session, we will look at some alternatives for using Spark.
- Running a Docker container to use jupyter notebook/lab to access spark.
- Using Databricks Community Edition.


# Structured Data Processing
- Spark has two notions of structured collections:
  - DataFrames
  - Datasets
- They are distributed table-like collections with well-defined rows and columns.
- They represent immutable lazily evaluated plans.
- When an action is performed on them, Spark performs the actual transformations
and return the result.

Code examples used in this notebook can be found [here](https://id2221kth.github.io/slides/2021/07_structured_data_processing.pdf).

In [1]:
println(s"This notebook is tested with Spark 3.1.2.\nYou are using ${sc.version}.")

Intitializing Scala interpreter ...

Spark Web UI available at http://e507a37883e8:4041
SparkContext available as 'sc' (version = 3.1.2, master = local[*], app id = local-1632307403324)
SparkSession available as 'spark'


This notebook is tested with Spark 3.1.2.
You are using 3.1.2.


## Read data using Spark

In [2]:
// people.json

// {"name":"Michael", "age":15, "id":12}
// {"name":"Andy", "age":30, "id":15}
// {"name":"Justin", "age":19, "id":20}
// {"name":"Andy", "age":12, "id":15}
// {"name":"Jim", "age":19, "id":20}
// {"name":"Andy", "age":12, "id":10}

In [3]:
// Data sources supported by Spark.
// • CSV, JSON, Parquet, ORC, JDBC/ODBC connections, Plain-text files
// • Cassandra, HBase, MongoDB, AWS Redshift, XML, etc.

// For example,
val people_df = spark.read.format("json").load("people.json")
people_df.schema


// val peopleCsv = spark.read.format("csv")
//                         .option("sep", ";")
//                         .option("inferSchema", "true")
//                         .option("header", "true")
//                         .load("people.csv")

people_df: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field]
res2: org.apache.spark.sql.types.StructType = StructType(StructField(age,LongType,true), StructField(id,LongType,true), StructField(name,StringType,true))


## Column

In [4]:
//  col returns a reference to a column
col("age")

res3: org.apache.spark.sql.Column = age


In [5]:
// expr performs transformations on a column
exp("age + 5 < 32")

res4: org.apache.spark.sql.Column = EXP(age + 5 < 32)


In [6]:
//  columns returns all columns on a DataFrame
people_df.columns

res5: Array[String] = Array(age, id, name)


In [7]:
// Different ways to refer to a column
people_df.col("name")

res6: org.apache.spark.sql.Column = name


In [8]:
col("name")
column("name")
'name
$"name"
expr("name")

res7: org.apache.spark.sql.Column = name


## Row
- A row is a record of data.
- They are of type ``Row``.
- Rows do not have schemas.
  -  The order of values should be the same order as the schema of the DataFrame to which they might be appended.
- To access data in rows, you need to specify the position that you would like.

In [9]:
import org.apache.spark.sql.Row

val my_row = Row("Seif", 65, 0)

import org.apache.spark.sql.Row
my_row: org.apache.spark.sql.Row = [Seif,65,0]


In [10]:
my_row(0)

res8: Any = Seif


In [11]:
my_row(0).asInstanceOf[String]

res9: String = Seif


In [12]:
my_row.getString(0)

res10: String = Seif


In [13]:
my_row.getInt(1)

res11: Int = 65


## Creating a DataFrame
There are two ways to create a DataFrame:
1. From an RDD.
2. From raw data sources.

In [14]:
val tuple_rdd = sc.parallelize(Array(("seif", 65, 0), ("amir", 40, 1)))
val tuple_df = tuple_rdd.toDF("name", "age", "id")

tuple_rdd: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:26
tuple_df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]


In [15]:
case class Person(name: String, age: Int, id: Int)

defined class Person


In [16]:
val my_rdd = sc.parallelize(Array(Person("seif", 65, 0), Person("amir", 40, 1)))
val my_df = my_rdd.toDF()

my_rdd: org.apache.spark.rdd.RDD[Person] = ParallelCollectionRDD[5] at parallelize at <console>:28
my_df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]


## DataFrame Transformations
- Add and remove rows or columns
- Transform a row into a column (or vice versa)
- Change the order of rows based on the values in columns

In [17]:
// select and selectExpr allow to do the DataFrame equivalent of SQL queries on a table of data.
// select
people_df.select("name", "age", "id").show(2)
people_df.select(col("name"), expr("age + 3")).show()

+-------+---+---+
|   name|age| id|
+-------+---+---+
|Michael| 15| 12|
|   Andy| 30| 15|
+-------+---+---+
only showing top 2 rows

+-------+---------+
|   name|(age + 3)|
+-------+---------+
|Michael|       18|
|   Andy|       33|
| Justin|       22|
|   Andy|       15|
|    Jim|       22|
|   Andy|       15|
+-------+---------+



In [18]:
people_df.select(col("name"), column("name"), 'name, $"name", expr("name")).show()

+-------+-------+-------+-------+-------+
|   name|   name|   name|   name|   name|
+-------+-------+-------+-------+-------+
|Michael|Michael|Michael|Michael|Michael|
|   Andy|   Andy|   Andy|   Andy|   Andy|
| Justin| Justin| Justin| Justin| Justin|
|   Andy|   Andy|   Andy|   Andy|   Andy|
|    Jim|    Jim|    Jim|    Jim|    Jim|
|   Andy|   Andy|   Andy|   Andy|   Andy|
+-------+-------+-------+-------+-------+



In [19]:
// selectExpr
// Spark SQL function selectExpr() is similar to select(), 
// the difference being it takes a set of SQL expressions in a string to execute. 
// This gives an ability to run SQL like expressions without creating a temporary table and views.

// Star Syntax basically selects all the columns similar to select * in sql
people_df.selectExpr("*", "(age < 20) as teenager", "id as ID").show()
people_df.selectExpr("avg(age)", "count(distinct(name))", "sum(id)").show()

+---+---+-------+--------+---+
|age| id|   name|teenager| ID|
+---+---+-------+--------+---+
| 15| 12|Michael|    true| 12|
| 30| 15|   Andy|   false| 15|
| 19| 20| Justin|    true| 20|
| 12| 15|   Andy|    true| 15|
| 19| 20|    Jim|    true| 20|
| 12| 10|   Andy|    true| 10|
+---+---+-------+--------+---+

+------------------+--------------------+-------+
|          avg(age)|count(DISTINCT name)|sum(id)|
+------------------+--------------------+-------+
|17.833333333333332|                   4|     92|
+------------------+--------------------+-------+



In [20]:
// filter and where both filter rows.

people_df.filter(col("age") < 20).show()
people_df.filter("age < 20").show()

people_df.where("age < 20").show()

// distinct can be used to extract unique rows.
people_df.select("name").distinct().show()

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 15| 12|Michael|
| 19| 20| Justin|
| 12| 15|   Andy|
| 19| 20|    Jim|
| 12| 10|   Andy|
+---+---+-------+

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 15| 12|Michael|
| 19| 20| Justin|
| 12| 15|   Andy|
| 19| 20|    Jim|
| 12| 10|   Andy|
+---+---+-------+

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 15| 12|Michael|
| 19| 20| Justin|
| 12| 15|   Andy|
| 19| 20|    Jim|
| 12| 10|   Andy|
+---+---+-------+

+-------+
|   name|
+-------+
|    Jim|
|Michael|
|   Andy|
| Justin|
+-------+



In [21]:
// withColumn adds a new column to a DataFrame
people_df.withColumn("teenager", expr("age < 20")).show()

people_df.selectExpr("*", "(age < 20) as teenager").show()

+---+---+-------+--------+
|age| id|   name|teenager|
+---+---+-------+--------+
| 15| 12|Michael|    true|
| 30| 15|   Andy|   false|
| 19| 20| Justin|    true|
| 12| 15|   Andy|    true|
| 19| 20|    Jim|    true|
| 12| 10|   Andy|    true|
+---+---+-------+--------+

+---+---+-------+--------+
|age| id|   name|teenager|
+---+---+-------+--------+
| 15| 12|Michael|    true|
| 30| 15|   Andy|   false|
| 19| 20| Justin|    true|
| 12| 15|   Andy|    true|
| 19| 20|    Jim|    true|
| 12| 10|   Andy|    true|
+---+---+-------+--------+



In [22]:
// withColumnRenamed renames a column
people_df.withColumnRenamed("name", "username").columns

// drop removes a column
val temp_df = people_df.drop("name")

temp_df: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint]


In [23]:
temp_df.show()

+---+---+
|age| id|
+---+---+
| 15| 12|
| 30| 15|
| 19| 20|
| 12| 15|
| 19| 20|
| 12| 10|
+---+---+



In [24]:
people_df.show()

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 15| 12|Michael|
| 30| 15|   Andy|
| 19| 20| Justin|
| 12| 15|   Andy|
| 19| 20|    Jim|
| 12| 10|   Andy|
+---+---+-------+



## DataFrame Actions
Like RDDs, DataFrames also have their own set of actions.
- ``collect``: returns an array that contains all of rows in this DataFrame.
- ``count``: returns the number of rows in this DataFrame.
- ``first`` and ``head``: returns the first row of the DataFrame.
- ``show``: displays the top 20 rows of the DataFrame in a tabular form.
- ``take``: returns the first n rows of the DataFrame.

In [25]:
people_df.show()

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 15| 12|Michael|
| 30| 15|   Andy|
| 19| 20| Justin|
| 12| 15|   Andy|
| 19| 20|    Jim|
| 12| 10|   Andy|
+---+---+-------+



### Summarizing a Complete DataFrame

In [55]:
// count returns the total number of values
people_df.select(count("age")).show()

// countDistinct returns the number of unique groups
people_df.select(countDistinct("name")).show()

// first and last return the first and last value of a DataFrame
people_df.select(first("name"), last("age")).show()

+----------+
|count(age)|
+----------+
|         6|
+----------+

+--------------------+
|count(DISTINCT name)|
+--------------------+
|                   4|
+--------------------+

+-----------+---------+
|first(name)|last(age)|
+-----------+---------+
|    Michael|       12|
+-----------+---------+



In [27]:
// min and max extract the minimum and maximum values from a DataFrame
people_df.select(min("name"), max("age"), max("id")).show()

// sum adds all the values in a column
people_df.select(sum("age")).show()

// avg calculates the average
people_df.select(avg("age")).show()

+---------+--------+-------+
|min(name)|max(age)|max(id)|
+---------+--------+-------+
|     Andy|      30|     20|
+---------+--------+-------+

+--------+
|sum(age)|
+--------+
|     107|
+--------+

+------------------+
|          avg(age)|
+------------------+
|17.833333333333332|
+------------------+



### Group By
- Perform aggregations on groups in the data.
- Typically on categorical data.
- We do this grouping in two phases:
  1. Specify the column(s) on which we would like to group.
  2. Specify the aggregation(s).

In [28]:
people_df.show()

// Grouping with expressions

// Rather than passing that function as an expression into a select statement, 
// we specify it as within agg.
people_df.groupBy("name").agg(count("age").alias("ageagg")).show()

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 15| 12|Michael|
| 30| 15|   Andy|
| 19| 20| Justin|
| 12| 15|   Andy|
| 19| 20|    Jim|
| 12| 10|   Andy|
+---+---+-------+

+-------+------+
|   name|ageagg|
+-------+------+
|    Jim|     1|
|Michael|     1|
|   Andy|     3|
| Justin|     1|
+-------+------+



In [29]:
// Grouping with Maps

// Specify transformations as a series of Maps
// The key is the column, and the value is the aggregation function (as a string).
people_df.groupBy("name").agg("age" -> "count", "age" -> "avg", "id" -> "max").show()

+-------+----------+--------+-------+
|   name|count(age)|avg(age)|max(id)|
+-------+----------+--------+-------+
|    Jim|         1|    19.0|     20|
|Michael|         1|    15.0|     12|
|   Andy|         3|    18.0|     15|
| Justin|         1|    19.0|     20|
+-------+----------+--------+-------+



### Windowing
- Computing some aggregation on a specific ``window`` of data.
- The window determines which rows will be passed in to this function.
- You define them by using a reference to the current data.
- A group of rows is called a ``frame``.

In [30]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col

// Unlike grouping, here each row can fall into one or more frames.
val windowSpec = Window.rowsBetween(-1, 1)
val avgAge = avg(col("age")).over(windowSpec)

people_df.select(col("name"), col("age"), avgAge.alias("avg_age")).show()

println((15.0+30.0)/2)

println((15.0+30.0+19.0)/3)

+-------+---+------------------+
|   name|age|           avg_age|
+-------+---+------------------+
|Michael| 15|              22.5|
|   Andy| 30|21.333333333333332|
| Justin| 19|20.333333333333332|
|   Andy| 12|16.666666666666668|
|    Jim| 19|14.333333333333334|
|   Andy| 12|              15.5|
+-------+---+------------------+

22.5
21.333333333333332


import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@4c08012b
avgAge: org.apache.spark.sql.Column = avg(age) OVER (ROWS BETWEEN -1 FOLLOWING AND 1 FOLLOWING)


### Joins
- Joins are relational constructs you use to combine relations together.
- Combine rows from two or more tables, based on a related column between them.
- Different join types ([spark_examples](https://www.educba.com/join-in-spark-sql/)): 
  - inner join,
  - outer join,
  - left outer join,
  - right outer join,
  - left semi join,
  - left anti join,
  - cross join

In [31]:
val person_df = Seq((0, "Seif", 0), (1, "Amir", 1), (2, "Sarunas", 1)).toDF("id", "name", "group_id")
person_df.show()

val group_df = Seq((0, "SICS/KTH"), (1, "KTH"), (2, "SICS")).toDF("id", "department")
group_df.show()

+---+-------+--------+
| id|   name|group_id|
+---+-------+--------+
|  0|   Seif|       0|
|  1|   Amir|       1|
|  2|Sarunas|       1|
+---+-------+--------+

+---+----------+
| id|department|
+---+----------+
|  0|  SICS/KTH|
|  1|       KTH|
|  2|      SICS|
+---+----------+



person_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
group_df: org.apache.spark.sql.DataFrame = [id: int, department: string]


In [56]:
// inner join

val join_expression = person_df.col("group_id") === group_df.col("id")
val join_type = "inner"

person_df.join(group_df, join_expression, join_type).show()

+---+-------+--------+---+----------+
| id|   name|group_id| id|department|
+---+-------+--------+---+----------+
|  0|   Seif|       0|  0|  SICS/KTH|
|  1|   Amir|       1|  1|       KTH|
|  2|Sarunas|       1|  1|       KTH|
+---+-------+--------+---+----------+



join_expression: org.apache.spark.sql.Column = (group_id = id)
join_type: String = inner


In [32]:
// For Spark, "===" is using the equalTo method
// returns a column (which contains the result of the comparisons of the elements of two columns)
person_df.col("group_id") === group_df.col("id")

res26: org.apache.spark.sql.Column = (group_id = id)


In [33]:
// "==" checks if the two references point to the same object
// returns a boolean
person_df.col("group_id") == group_df.col("id")

res27: Boolean = false


In [35]:
// outer join

val join_expression = person_df.col("group_id") === group_df.col("id")
val join_type = "outer"

person_df.join(group_df, join_expression, join_type).show()

+----+-------+--------+---+----------+
|  id|   name|group_id| id|department|
+----+-------+--------+---+----------+
|   1|   Amir|       1|  1|       KTH|
|   2|Sarunas|       1|  1|       KTH|
|null|   null|    null|  2|      SICS|
|   0|   Seif|       0|  0|  SICS/KTH|
+----+-------+--------+---+----------+



join_expression: org.apache.spark.sql.Column = (group_id = id)
join_type: String = outer


### Joins Communication Strategies
- Shuffle join
  - Every node talks to every other node
  - Share data according to which node has a certain key or set of keys
  - Join **big table** to **big table**
- Broadcast join
  - When the table is small enough to fit into the memory of a single worker node.
  - Join **big table** to **small table**

[Optional] If interested, check the following
- [The art of joining in Spark](https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c)
- [Joining Strategies in Apache Spark](https://medium.com/nerd-for-tech/joining-strategies-in-apache-spark-f802a7dab150)


## SQL

You can run SQL queries on views/tables via the method sql on the SparkSession
object.

In [36]:
// createOrReplaceTempView creates (or replaces) a lazily evaluated view
people_df.createOrReplaceTempView("people_view")

In [37]:
spark.sql("SELECT * from people_view").show()

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 15| 12|Michael|
| 30| 15|   Andy|
| 19| 20| Justin|
| 12| 15|   Andy|
| 19| 20|    Jim|
| 12| 10|   Andy|
+---+---+-------+



In [38]:
val teenagers_df = spark.sql("SELECT name, age FROM people_view WHERE age BETWEEN 13 AND 19")

teenagers_df.show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 15|
| Justin| 19|
|    Jim| 19|
+-------+---+



teenagers_df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]


## Dataset

In [39]:
people_df.filter("id_num < 20")

org.apache.spark.sql.AnalysisException:  cannot resolve '`id_num`' given input columns: [age, id, name]; line 1 pos 0;

In [40]:
val collected_people = people_df.collect()

collected_people: Array[org.apache.spark.sql.Row] = Array([15,12,Michael], [30,15,Andy], [19,20,Justin], [12,15,Andy], [19,20,Jim], [12,10,Andy])


In [41]:
people_df.schema

res34: org.apache.spark.sql.types.StructType = StructType(StructField(age,LongType,true), StructField(id,LongType,true), StructField(name,StringType,true))


In [42]:
// Person(age: Long, id: Long, name: String)

// To be able to work with the collected values, we should cast the Rows.
// • How many columns?
// • What types?

val collected_list = collected_people.map{
    row => (row(0).asInstanceOf[Long], row(1).asInstanceOf[Long], row(2).asInstanceOf[String])
}

// But, what if we cast the types wrong?
// Wouldn’t it be nice if we could have both Spark SQL optimizations and typesafety?

### Creating Datasets

In [43]:
case class Person(name: String, age: BigInt, id: BigInt)

defined class Person


In [44]:
val person_seq = Seq(Person("Max", 33, 0), Person("Adam", 32, 1))

val ds1 = sc.parallelize(person_seq).toDS()

person_seq: Seq[Person] = List(Person(Max,33,0), Person(Adam,32,1))
ds1: org.apache.spark.sql.Dataset[Person] = [name: string, age: decimal(38,0) ... 1 more field]


In [45]:
val ds2 = spark.read.format("json").load("people.json").as[Person]

ds2: org.apache.spark.sql.Dataset[Person] = [age: bigint, id: bigint ... 1 more field]


### Dataset Transformations
- Transformations on Datasets are the same as those that we had on DataFrames.
- Datasets allow us to specify more complex and strongly typed transformations.

In [46]:
case class Person(name: String, age: BigInt, id: BigInt)

defined class Person


In [47]:
val people_ds = spark.read.format("json").load("people.json").as[Person].cache()

people_ds: org.apache.spark.sql.Dataset[Person] = [age: bigint, id: bigint ... 1 more field]


In [48]:
people_ds.show()

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 15| 12|Michael|
| 30| 15|   Andy|
| 19| 20| Justin|
| 12| 15|   Andy|
| 19| 20|    Jim|
| 12| 10|   Andy|
+---+---+-------+



In [49]:
people_ds.filter(people_ds("age") < 40).show()

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 15| 12|Michael|
| 30| 15|   Andy|
| 19| 20| Justin|
| 12| 15|   Andy|
| 19| 20|    Jim|
| 12| 10|   Andy|
+---+---+-------+



In [50]:
people_ds.filter(x => x.age < 40).show()

org.apache.spark.SparkException:  Job aborted due to stage failure: Task 0 in stage 87.0 failed 1 times, most recent failure: Lost task 0.0 in stage 87.0 (TID 1252) (e507a37883e8 executor driver): java.lang.ClassCastException: class $iw cannot be cast to class $iw ($iw is in unnamed module of loader org.apache.spark.repl.ExecutorClassLoader @57af9fcd; $iw is in unnamed module of loader scala.tools.nsc.interpreter.IMain$TranslatingClassLoader @7cd2d5b)

In [51]:
people_ds.map(x => (x.name, x.age + 5, x.id)).show()

org.apache.spark.SparkException:  Job aborted due to stage failure: Task 0 in stage 88.0 failed 1 times, most recent failure: Lost task 0.0 in stage 88.0 (TID 1253) (e507a37883e8 executor driver): java.lang.ClassCastException: class $iw cannot be cast to class $iw ($iw is in unnamed module of loader org.apache.spark.repl.ExecutorClassLoader @57af9fcd; $iw is in unnamed module of loader scala.tools.nsc.interpreter.IMain$TranslatingClassLoader @7cd2d5b)

# The END.