# Aggregating DataFrames in PySpark

In this lecture we will be going over how to aggregate dataframes in Pyspark. The commands we will learn here will be super useful for doing quality checks on your dataframes and answering more simiplistic business questions with you data. 

So let's get to it! Here is what we will cover today:

 - GroupBy
 - Pivot
 - Aggregate methods
 - Combos of each

In [1]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("aggregate").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/12 06:58:05 WARN Utils: Your hostname, Logamous-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.68 instead (on interface en0)
25/11/12 06:58:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/12 06:58:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/12 06:58:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


You are working with 1 core(s)


## Read in the dataFrame for this Notebook

In [2]:
# Start by reading in a basic csv dataset
# Let Spark know about the header and infer the Schema types!

#Some csv data
airbnb = spark.read.csv('Datasets/nyc_air_bnb.csv',inferSchema=True,header=True)

## About this dataset

This dataset describes the listing activity and metrics for Air BNB bookers in NYC, NY for 2019. Each line in the dataset is a booking. 

**Source:** https://www.kaggle.com/dgomonov/new-york-city-airbnb-open-data/data

Let's go ahead and view the first view lines of the dataframe.

In [3]:
airbnb.limit(5).toPandas()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.21,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.38,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.64,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.1,1,0


In [4]:
print(airbnb.printSchema())

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: integer (nullable = true)

None


Notice here that some of the columns that are obviously numeric have been incorrectly identified as "strings". Let's edit that. Otherwise we cannot aggregate any of the numeric columns.

In [7]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df = airbnb.withColumn("price", airbnb["price"].cast(IntegerType())) \
        .withColumn("minimum_nights", airbnb["minimum_nights"].cast(IntegerType())) \
        .withColumn("number_of_reviews", airbnb["number_of_reviews"].cast(IntegerType())) \
        .withColumn("reviews_per_month", airbnb["reviews_per_month"].cast(IntegerType())) \
        .withColumn("calculated_host_listings_count", airbnb["calculated_host_listings_count"].cast(IntegerType()))
#QA
print(df.printSchema())
df.limit(5).toPandas()

25/11/12 06:59:37 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 6)
org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value '0.21' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"cast" was called from
line 7 in cell [7]

	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:145)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.withException(UTF8StringUtils.scala:51)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.toIntExact(UTF8StringUtils.scala:34)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils.toIntExact(UTF8StringUtils.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedR

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)

None


NumberFormatException: [CAST_INVALID_INPUT] The value '0.21' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"cast" was called from
line 7 in cell [7]


# GroupBy and Aggregate Functions

Let's learn how to use GroupBy and Aggregate methods on a DataFrame. These two commands go hand in hand many times in PySpark. ACtually in order to use the GroupBy command, you have to also tell Spark what numeric aggregate you want to learn about. For example, count, average or min/max. 

GroupBy allows you to group rows together based off some column value, for example, you could group together sales data by the day the sale occured, or group repeat customer data based off the name of the customer. Once you've performed the GroupBy operation you can use an aggregate function off that data. An aggregate function aggregates multiple rows of data into a single output, such as taking the sum of inputs, or counting the number of inputs.

You can also use the aggreate function independently as well to learn about overall statistics of your dataframe too which we will see in some of our examples. 

So let's dig in!

In [None]:
# For example we may be interested to see how many listings there were per neighbourhood group. 
# Groupby Function with count (you can also use sum, min, max)
df.groupBy("neighbourhood_group").count().show(7)

In [None]:
# Then you can add the following aggregate functions: mean, count, min, max, sum
# Like this for example


df.groupBy("neighbourhood_group").mean("price").show(5)

In [None]:
# This is another way of doing the above but I don't recommend it
# because you can only do one var at a time
df.groupBy("neighbourhood").agg({'price':'mean'}).show(5)

In [None]:
# This method is way more versatile
# Allows you to call on more than one aggregate function at a time
# It's my fav for this reason!
from pyspark.sql.functions import *
df.groupBy("neighbourhood").agg(min(df.price).alias("Min Price"),max(df.price).alias("Max Price")).show(5)

In [None]:
# This is also a pretty neat function you can use:
summary = df.summary("count", "min", "25%", "75%", "max")
summary.toPandas()
# But be careful because it'll perform this operation on your whole df!

In [None]:
# Eh that was ugly!
# To do a summary for specific columns first select them:
# limit_summary = df.select("price","minimum_nights","number_of_reviews","last_review","reviews_per_month","calculated_host_listings_count","availability_365").summary("count","min","max")
limit_summary = df.select("price","minimum_nights","number_of_reviews").summary("count","min","max")
limit_summary.toPandas()

### Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).

This is great, but what if we wanted the overall summary metrics like average and counts for more than one variable and without a groupBy variable? We could do this using the pyspark.sql functions library.

In [None]:
# Aggregate!
# agg(*exprs)
# Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).
# available agg functions: min, max, count, countDistinct, approx_count_distinct
# df.agg.(covar_pop(col1, col2)) Returns a new Column for the population covariance of col1 and col2
# df.agg.(covar_samp(col1, col2)) Returns a new Column for the sample covariance of col1 and col2.
# df.agg(corr(col1, col2)) Returns a new Column for the Pearson Correlation Coefficient for col1 and col2.

from pyspark.sql.functions import *
df.agg(min(df.price).alias("Min Price"),max(df.price).alias("Max Price")).show()

In [None]:
# There is also this method which is pretty similar
df.select(countDistinct("neighbourhood_group").alias('CountD'),avg('price'),stddev("price")).show()

In [None]:
# You could also write the syntax like this....
# But keep in mind with this method that you can only do one variable at a time (bummer)
# Again I don't recommend this!
# Max sales across everything
df.agg({'number_of_reviews':'max'}).withColumnRenamed("max(number_of_reviews)", "Max Reviews").show()

### Pivot Function

Provides a two way table and must be used in conjunction with groupBy.

In [None]:
# Pivot Function
# pivot(pivot_col, values=None)
df.groupBy("room_type").pivot("neighbourhood_group", ["Queens", "Brooklyn"]).count().show(10)

In [None]:
# You can also filter your results if you need to
# We some invalid data in the above output
# So we could select only the "Share room" types if we wanted to
df.filter("room_type='Shared room'").groupBy("room_type").pivot("neighbourhood_group", ["Queens", "Brooklyn"]).count().show(100)

### Comine all three!

It is also possible to combine all three method into one call: GroupBy, Pivot and Agg like this:

In [None]:
# from pyspark.sql.functions import *
df.groupBy("neighbourhood").pivot("neighbourhood_group", ["Queens", "Brooklyn"]).agg(min(df.price).alias("Min Price"),max(df.price).alias("Max Price")).toPandas()#.show()
# Note The toPandas() method should only be used if the resulting Pandas’s DataFrame is expected to be small, 
# as all the data is loaded into the driver’s memory.

In [None]:
spark.stop()