-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Aggregation

##### Objectives
1. Group data by specified columns
1. Apply grouped data methods to aggregate data
1. Apply built-in functions to aggregate data

##### Methods
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html" target="_blank">DataFrame</a>: **`groupBy`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/grouping.html" target="_blank" target="_blank">Grouped Data</a>: **`agg`**, **`avg`**, **`count`**, **`max`**, **`sum`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html" target="_blank">Built-In Functions</a>: **`approx_count_distinct`**, **`avg`**, **`sum`**

In [0]:
%run ../Includes/Classroom-Setup-00.5

Python interpreter will be restarted.
Python interpreter will be restarted.



Skipping install of existing datasets to "dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v01"

Validating the locally installed datasets...(2 seconds)
Creating & using the schema "dnchankov_bezt_dbacademy_delp"...(1 seconds)

Cloning the "sales" table from "dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v01/ecommerce/delta/sales_hist"....(15 seconds)
Cloning the "events" table from "dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v01/ecommerce/delta/events_hist"....(5 seconds)
Cloning the "users" table from "dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v01/ecommerce/delta/users_hist"....(4 seconds)
Cloning the "products" table from "dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v01/ecommerce/delta/item_lookup"....(3 seconds)

Predefined tables in "dnchankov_bezt_dbacademy_delp":
  events
  products
  sales
  users

Predefined paths variables:
  DA.paths.working_dir: dbfs:/mnt/dbacademy-users/dnchankov@abv.bg/data-engineer-learning-pa

Let's use the BedBricks events dataset.

In [0]:
df = spark.table("events")
display(df)

device,ecommerce,event_name,event_previous_timestamp,event_timestamp,geo,items,traffic_source,user_first_touch_timestamp,user_id
Android,"List(null, null, null)",mattresses,1593614063954129.0,1593614089359899,"List(Attleboro, MA)",List(),google,1593614037088511,UA000000106525232
Android,"List(null, null, null)",main,,1593613901473050,"List(Rockport, TX)",List(),google,1593613901473050,UA000000106524533
Android,"List(null, null, null)",add_item,1593595595984555.0,1593595620017592,"List(De Kalb, TX)","List(List(null, M_STAN_Q, Standard Queen Mattress, 1045.0, 1045.0, 1))",instagram,1593595391984879,UA000000106466918
Linux,"List(null, null, null)",main,,1593611755190083,"List(Warwick, RI)",List(),google,1593611755190083,UA000000106513930
Linux,"List(null, null, null)",mattresses,,1593613449856735,"List(Coeur d'Alene, ID)",List(),youtube,1593613449856735,UA000000106522140
macOS,"List(null, null, null)",guest,1593617658665328.0,1593617822519726,"List(Clovis, CA)","List(List(null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1))",facebook,1593616922295696,UA000000106541016
macOS,"List(null, null, null)",email_coupon,1593617459515520.0,1593618432116637,"List(St. Augustine, FL)",List(),google,1593616968692210,UA000000106541282
iOS,"List(null, null, null)",main,,1593615063697854,"List(Victorville, CA)",List(),youtube,1593615063697854,UA000000106530656
macOS,"List(null, null, null)",faq,1593616650732534.0,1593616950510558,"List(Wichita, KS)",List(),instagram,1593616650732534,UA000000106539566
iOS,"List(null, null, null)",email_coupon,1593618897029648.0,1593619509779272,"List(San Francisco, CA)",List(),youtube,1593618897029648,UA000000106552410


### Grouping data

<img src="https://files.training.databricks.com/images/aspwd/aggregation_groupby.png" width="60%" />

### groupBy
Use the DataFrame **`groupBy`** method to create a grouped data object. 

This grouped data object is called **`RelationalGroupedDataset`** in Scala and **`GroupedData`** in Python.

In [0]:
df.groupBy("event_name")

Out[7]: <pyspark.sql.group.GroupedData at 0x7fb4e861b6a0>

In [0]:
df.groupBy("geo.state", "geo.city")

Out[8]: <pyspark.sql.group.GroupedData at 0x7fb4e84863d0>

### Grouped data methods
Various aggregation methods are available on the <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/grouping.html" target="_blank">GroupedData</a> object.


| Method | Description |
| --- | --- |
| agg | Compute aggregates by specifying a series of aggregate columns |
| avg | Compute the mean value for each numeric columns for each group |
| count | Count the number of rows for each group |
| max | Compute the max value for each numeric columns for each group |
| mean | Compute the average value for each numeric columns for each group |
| min | Compute the min value for each numeric column for each group |
| pivot | Pivots a column of the current DataFrame and performs the specified aggregation |
| sum | Compute the sum for each numeric columns for each group |

In [0]:
event_counts_df = df.groupBy("event_name").count()
display(event_counts_df)

event_name,count
mattresses,59862
down,4173
press,11386
shipping_info,14531
main,149953
warranty,12583
finalize,9056
login,1693
faq,13917
careers,3484


Here, we're getting the average purchase revenue for each.

In [0]:
avg_state_purchases_df = df.groupBy("geo.state").avg("ecommerce.purchase_revenue_in_usd")
display(avg_state_purchases_df)

state,avg(ecommerce.purchase_revenue_in_usd AS purchase_revenue_in_usd)
AZ,1023.3955823293174
SC,1021.6054054054052
LA,1023.870786516854
MN,1048.7114537444936
NJ,1045.079411764706
DC,1072.3973684210523
OR,1064.481012658228
VA,1049.234532374101
RI,1035.07
KY,1081.8373333333332


And here the total quantity and sum of the purchase revenue for *each combination* of state and city *filtered* (without "null"s).

In [0]:
from pyspark.sql.functions import col

city_purchase_quantities_df = df.filter(col("ecommerce.total_item_quantity").isNotNull()).groupBy("geo.state", "geo.city").sum(("ecommerce.total_item_quantity"), "ecommerce.purchase_revenue_in_usd")

display(city_purchase_quantities_df)

state,city,sum(ecommerce.total_item_quantity AS total_item_quantity),sum(ecommerce.purchase_revenue_in_usd AS purchase_revenue_in_usd)
MO,Kansas City,29,28236.6
FL,Miami Springs,3,2730.5
IN,Indianapolis,70,55964.09999999999
CA,San Juan Capistrano,2,2151.0
LA,Shreveport,13,13050.0
MA,Attleboro,5,4222.0
VA,Fredericksburg,1,1045.0
OR,Dallas,2,1926.0
OH,Highland Heights,1,595.0
OH,South Euclid,2,1670.5


## Built-In Functions
In addition to DataFrame and Column transformation methods, there are a ton of helpful functions in Spark's built-in <a href="https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-functions-builtin.html" target="_blank">SQL functions</a> module.

In Scala, this is <a href="https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/functions$.html" target="_blank">**`org.apache.spark.sql.functions`**</a>, and <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions" target="_blank">**`pyspark.sql.functions`**</a> in Python. Functions from this module must be imported into your code.

### Aggregate Functions

Here are some of the built-in functions available for aggregation.

| Method | Description |
| --- | --- |
| approx_count_distinct | Returns the approximate number of distinct items in a group |
| avg | Returns the average of the values in a group |
| collect_list | Returns a list of objects with duplicates |
| corr | Returns the Pearson Correlation Coefficient for two columns |
| max | Compute the max value for each numeric columns for each group |
| mean | Compute the average value for each numeric columns for each group |
| stddev_samp | Returns the sample standard deviation of the expression in a group |
| sumDistinct | Returns the sum of distinct values in the expression |
| var_pop | Returns the population variance of the values in a group |

Use the grouped data method <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.GroupedData.agg.html#pyspark.sql.GroupedData.agg" target="_blank">**`agg`**</a> to apply built-in aggregate functions

This allows you to apply other transformations on the resulting columns, such as <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.alias.html" target="_blank">**`alias`**</a>.

In [0]:
from pyspark.sql.functions import sum

state_purchases_df = df.groupBy("geo.state").agg(sum("ecommerce.total_item_quantity").alias("total_purchases"))
display(state_purchases_df)

state,total_purchases
AZ,276
SC,90
LA,103
MN,254
NJ,118
DC,45
OR,184
VA,151
RI,33
KY,89


Apply multiple aggregate functions on grouped data

In [0]:
from pyspark.sql.functions import avg, approx_count_distinct

state_aggregates_df = (df
                       .groupBy("geo.state")
                       .agg(avg("ecommerce.total_item_quantity").alias("avg_quantity"),
                            approx_count_distinct("user_id").alias("distinct_users"))
                      )

display(state_aggregates_df)

state,avg_quantity,distinct_users
AZ,1.108433734939759,5902
SC,1.2162162162162162,1427
LA,1.1573033707865168,2145
MN,1.118942731277533,4890
NJ,1.156862745098039,2351
DC,1.1842105263157894,831
OR,1.1645569620253164,3433
VA,1.0863309352517985,2763
RI,1.1,608
KY,1.1866666666666668,1662


### Math Functions
Here are some of the built-in functions for math operations.

| Method | Description |
| --- | --- |
| ceil | Computes the ceiling of the given column. |
| cos | Computes the cosine of the given value. |
| log | Computes the natural logarithm of the given value. |
| round | Returns the value of the column e rounded to 0 decimal places with HALF_UP round mode. |
| sqrt | Computes the square root of the specified float value. |

In [0]:
from pyspark.sql.functions import cos, sqrt

display(spark.range(10)  # Create a DataFrame with a single column called "id" with a range of integer values
        .withColumn("sqrt", sqrt("id"))
        .withColumn("cos", cos("id"))
       )

id,sqrt,cos
0,0.0,1.0
1,1.0,0.5403023058681398
2,1.4142135623730951,-0.4161468365471424
3,1.7320508075688772,-0.9899924966004454
4,2.0,-0.6536436208636119
5,2.23606797749979,0.2836621854632262
6,2.449489742783178,0.960170286650366
7,2.6457513110645907,0.7539022543433046
8,2.8284271247461903,-0.1455000338086135
9,3.0,-0.9111302618846768


Run the following cell to delete the tables and files associated with this lesson.

In [0]:
DA.cleanup()

Resetting the learning environment...
...dropping the schema "dnchankov_bezt_dbacademy_delp"...(2 seconds)
...removing the working directory "dbfs:/mnt/dbacademy-users/dnchankov@abv.bg/data-engineer-learning-path"...(0 seconds)

Validating the locally installed datasets...(1 seconds)


-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>