# Guided Exercises - Part 2

## Goals:

- Understand what a RDD is and the difference between transformations and actions
- Understand how spark computates the aggregation through the DAG

## What is a RDD?

RDD stands for Resilient Distributed Datasets. It is read-only partition collection of records and is the fundamental data structure of Spark. An RDD could come from any datasource, e.g. text files, a database via JDBC, etc.

```
df = spark.textFile("path/to/file.txt")
```

RDDs can also be thought of as a set of instructions that has to be executed, first instruction being the load instruction.

[Additional resources about RDDs](https://sparkbyexamples.com/apache-spark-rdd/spark-rdd-transformations/)

## What is a dataframe?

In Dataframe, data organized into named columns. For example a table in a relational database. It is an immutable distributed collection of data. DataFrame in Spark allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction.

1. If you want to apply a map or filter to the whole dataset, use RDD
2. If you want to work on an individual column or want to perform operations/calculations on a column then use Dataframe.


## Operations(Transformations and Actions)

There are two types of operations that you can perform on an RDD - Transformations and Actions. Transformation applies some function on a RDD and creates a new RDD, it does not modify the RDD that you apply the function on. Also, the new RDD keeps a pointer to it’s parent RDD.

![](https://external-content.duckduckgo.com/iu/?u=http%3A%2F%2Fvishnuviswanath.com%2Fimg%2Fspark_rdd%2Frdd_transformation.png&f=1&nofb=1)
_______________________

## Create dataframes by reading data

We can use dbutils to view the data available for this workshop, just like in Guided Exercises - Part 1

In [0]:
data_path = "abfss://attributes@sa8451ccnthwstext.dfs.core.windows.net/"

# Import transaction data
transaction_data = spark.read.parquet(f"{data_path}/transaction_data/")
transaction_data.limit(10).display()

# Import Household Demographic data
hshd_demographic = spark.read.parquet(f"{data_path}/hh_demographic")
hshd_demographic.limit(10).display(True, 10)

household_key,BASKET_ID,DAY,PRODUCT_ID,QUANTITY,SALES_VALUE,STORE_ID,RETAIL_DISC,TRANS_TIME,WEEK_NO,COUPON_DISC,COUPON_MATCH_DISC
2400,28210690256,103,1135886,2,3.08,289,-1.3,1038,15,0,0
2327,27637386508,55,908531,2,2.0,341,-1.38,1650,9,0,0
222,27529642580,46,837167,1,1.34,304,-0.25,1539,7,0,0
2097,27757092467,65,6463800,3,5.0,403,-4.87,1919,10,0,0
715,28294440682,109,835530,1,2.5,354,-1.49,1316,16,0,0
335,27309583562,27,1082185,1,0.7,450,0.0,2229,5,0,0
777,27950510537,82,6554886,1,2.99,408,-1.0,1228,12,0,0
2314,28197453773,101,876592,1,2.99,450,0.0,2221,15,0,0
601,27853362380,73,1015247,1,0.75,306,0.0,1718,11,0,0
91,28179811614,100,827175,1,1.99,359,0.0,1618,15,0,0


AGE_DESC,MARITAL_STATUS_CODE,INCOME_DESC,HOMEOWNER_DESC,HH_COMP_DESC,HOUSEHOLD_SIZE_DESC,KID_CATEGORY_DESC,household_key
65+,A,35-49K,Homeowner,2 Adults No Kids,2,None/Unknown,1
45-54,A,50-74K,Homeowner,2 Adults No Kids,2,None/Unknown,7
25-34,U,25-34K,Unknown,2 Adults Kids,3,1,8
25-34,U,75-99K,Homeowner,2 Adults Kids,4,2,13
45-54,B,50-74K,Homeowner,Single Female,1,None/Unknown,16
65+,B,Under 15K,Homeowner,2 Adults No Kids,2,None/Unknown,17
45-54,A,100-124K,Homeowner,2 Adults No Kids,2,None/Unknown,18
35-44,B,15-24K,Unknown,Single Female,1,None/Unknown,19
25-34,A,75-99K,Renter,2 Adults No Kids,2,None/Unknown,20
45-54,A,75-99K,Homeowner,2 Adults No Kids,2,None/Unknown,22


## Transformation 
Transformations are lazy operations, (e.g. filter, join, etc.). We can use the key *HOUSEHOLD_KEY* to join the transaction to product data

In [0]:
# Join transaction data to household demographic data using HOUSEHOLD_KEY as key
transactions_with_hh_demographic = transaction_data \
    .join(hshd_demographic, "household_key")

the original RDD of transaction_data and hshd_demographic are now parent RDDs to the RDD transactions_with_hh_demographic

## Action 
Actions are explicit operations, designed to either save data or display it (e.g. count, display). Let us create a RDD through an action by explicitly calling the result of a transformation.

Here is an example

![](https://miro.medium.com/max/1400/1*nenmaK1oa7EL-KtuVtTS7A.png)

In [0]:
import pyspark.sql.functions as f

In [0]:
# Determine the total sum of sales for each HOUSEHOLD_KEY
hh_sales_agg = transactions_with_hh_demographic \
    .groupBy(f.col("HOUSEHOLD_KEY")) \
    .agg(f.sum(f.col("SALES_VALUE")).alias("SALES")) \
    .orderBy(f.col("SALES").desc())

hh_sales_agg.limit(10).display()

HOUSEHOLD_KEY,SALES
1609,27859.67999999995
2322,23646.91999999997
1453,21661.289999999983
1430,20352.990000000023
718,19299.86000000001
707,19194.420000000024
1653,19153.750000000025
982,18790.34000000004
400,18494.14000000002
1229,18304.310000000023


In [0]:
# Count is another example of a RDD action
hshd_demographic.count()

## What is a DAG?
A Directed Acyclic Graph (DAG) is a visualization of the transformations applied to the RDD. The Spark scheduler will separate the RDD into stages based on various transformation applied. Each stage is comprised of tasks, based on the partitions of the RDD, which will perform same computation in parallel.

If RDDs are a set of instructions, DAGs are pictures of the instructions. Creating the locial flow of operations helps minimize the data shuffling, reduces the duration of computations with less data volume and increases the efficiency of the process with time. 

![](https://miro.medium.com/max/1400/1*yf3cMmPT4SrXdCp-B_zN5Q.png)

In [0]:
hh_sales_agg.limit(10).display()

HOUSEHOLD_KEY,SALES
1609,27859.679999999946
2322,23646.91999999997
1453,21661.289999999983
1430,20352.99000000002
718,19299.86000000002
707,19194.420000000024
1653,19153.750000000025
982,18790.340000000037
400,18494.140000000025
1229,18304.310000000023


Additional resources can be found here:

https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html

https://data-flair.training/blogs/dag-in-apache-spark/

## Questions

In [0]:
# QUESTION 1:
# What is the name of the parent RDD of the RDD grocery_products

product_data = spark.read.parquet(f"{data_path}/product")

grocery_products = product_data.filter(f.col("DEPARTMENT")=="GROCERY")
grocery_products_subset = grocery_products.filter(f.col("BRAND")=="Private")

In [0]:
# ANSWER HERE:


In [0]:
# product_data is the name of the parent RDD

In [0]:
# QUESTION 2:
# Perform a RDD action on the dataframe grocery_products_subset


In [0]:
# ANSWER HERE:


In [0]:
# method 1
# grocery_products_subset.count()
# grocery_products_subset.distinct().count()

In [0]:
# View the DAG of the above action you performed