# Aggregating DataFrames in PySpark

In this lecture we will be going over how to aggregate dataframes in Pyspark. The commands we will learn here will be super useful for doing quality checks on your dataframes and answering more simplistic business questions with you data. 

So let's get to it! Here is what we will cover today:

 - GroupBy
 - Pivot
 - Aggregate methods
 - Combos of each

In [7]:
# import findspark
# findspark.init()

import pyspark  # only run after findspark.init()
from pyspark.sql import SparkSession

# May take awhile locally
spark = SparkSession.builder.appName("aggregate").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Read in the dataFrame for this Notebook

In [8]:
# Start by reading in a basic csv dataset
# Let Spark know about the header and infer the Schema types!

# Some csv data
airbnb = spark.read.csv("Datasets/nyc_air_bnb.csv", inferSchema=True, header=True)

## About this dataset

This dataset describes the listing activity and metrics for Air BNB bookers in NYC, NY for 2019. Each line in the dataset is a booking. 

**Source:** https://www.kaggle.com/dgomonov/new-york-city-airbnb-open-data/data

Let's go ahead and view the first view lines of the dataframe.

In [9]:
airbnb.limit(5).toPandas()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.21,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.38,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.64,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.1,1,0


In [10]:
print(airbnb.printSchema())

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: integer (nullable = true)

None


Notice here that some of the columns that are obviously numeric have been incorrectly identified as "strings". Let's edit that. Otherwise we cannot aggregate any of the numeric columns.

In [11]:
from pyspark.sql.types import IntegerType

df = (
    airbnb.withColumn("price", airbnb["price"].cast(IntegerType()))
    .withColumn("minimum_nights", airbnb["minimum_nights"].cast(IntegerType()))
    .withColumn("number_of_reviews", airbnb["number_of_reviews"].cast(IntegerType()))
    .withColumn("reviews_per_month", airbnb["reviews_per_month"].cast(IntegerType()))
    .withColumn(
        "calculated_host_listings_count",
        airbnb["calculated_host_listings_count"].cast(IntegerType()),
    )
)
# QA
print(df.printSchema())
df.limit(5).toPandas()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)

None


Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.0,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.0,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.0,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.0,1,0


# GroupBy and Aggregate Functions

Let's learn how to use GroupBy and Aggregate methods on a DataFrame. These two commands go hand in hand many times in PySpark. ACtually in order to use the GroupBy command, you have to also tell Spark what numeric aggregate you want to learn about. For example, count, average or min/max. 

GroupBy allows you to group rows together based off some column value, for example, you could group together sales data by the day the sale occured, or group repeat customer data based off the name of the customer. Once you've performed the GroupBy operation you can use an aggregate function off that data. An aggregate function aggregates multiple rows of data into a single output, such as taking the sum of inputs, or counting the number of inputs.

You can also use the aggregate function independently as well to learn about overall statistics of your dataframe too which we will see in some of our examples. 

So let's dig in!

In [12]:
# For example we may be interested to see how many listings there were per neighbourhood group.
# Groupby Function with count (you can also use sum, min, max)
df.groupBy("neighbourhood_group").count().show(7)

+-------------------+-----+
|neighbourhood_group|count|
+-------------------+-----+
|         Douglaston|    1|
|             Queens| 5630|
|              Nadia|    1|
|            Midtown|    4|
|     Hell's Kitchen|    7|
|  Greenwich Village|    2|
|       Clinton Hill|    1|
+-------------------+-----+
only showing top 7 rows



In [13]:
# Then you can add the following aggregate functions: mean, count, min, max, sum
# Like this for example
df.groupBy("neighbourhood_group").mean("price").show(5)

+-------------------+------------------+
|neighbourhood_group|        avg(price)|
+-------------------+------------------+
|         Douglaston|               1.0|
|             Queens| 99.57690941385435|
|              Nadia|              null|
|            Midtown|               9.0|
|     Hell's Kitchen|1.2857142857142858|
+-------------------+------------------+
only showing top 5 rows



In [14]:
# This is another way of doing the above but I don't recommend it
# because you can only do one var at a time
df.groupBy("neighbourhood").agg({"price": "mean"}).show(5)

+-------------+----------+
|neighbourhood|avg(price)|
+-------------+----------+
|       Corona| 59.171875|
| Richmondtown|      78.0|
| Prince's Bay|     409.5|
|  Westerleigh|      71.5|
|   Mill Basin|    179.75|
+-------------+----------+
only showing top 5 rows



In [15]:
# This method is way more versatile
# Allows you to call on more than one aggregate function at a time
# It's my fav for this reason!
import pyspark.sql.functions as F

df.groupBy("neighbourhood").agg(
    F.min(df.price).alias("Min Price"), F.max(df.price).alias("Max Price")
).show(5)

+-------------+---------+---------+
|neighbourhood|Min Price|Max Price|
+-------------+---------+---------+
|       Corona|       23|      359|
| Richmondtown|       78|       78|
| Prince's Bay|       85|     1250|
|  Westerleigh|       40|      103|
|   Mill Basin|       85|      299|
+-------------+---------+---------+
only showing top 5 rows



In [16]:
# This is also a pretty neat function you can use:
summary = df.summary("count", "min", "25%", "75%", "max")
summary.toPandas()
# But be careful because it'll perform this operation on your whole df!

                                                                                

Unnamed: 0,summary,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,count,49079,49047,48894,48873,48894,48894,48894,48894,48894,48887,48891,48738,38845.0,38858,48891,48737
1,min,"12 mins Manhattan""",1 Bed Apt in Utopic Williamsburg,"Heart of Greenwich Village""","very clean studio app""",194716858,2,-73.72247,-73.71299,-73.90783,-74,0,0,-73.94134,0,0,0
2,25%,9471893.0,2.4544724E7,7797690.0,475.0,1.94716858E8,40.68771,40.68981,-73.98309,56.0,69,1,1,0.76,0,1,0
3,75%,2.9152899E7,1.74786681E8,1.07434423E8,,1.97400421E8,40.78304,40.76299,-73.93638,145.0,175,5,23,3.24,2,2,226
4,max,"獨一無二的紐約閣樓""","ﾏﾝﾊｯﾀﾝ､駅から徒歩4分でどこに行くのにも便利な場所!女性の方希望,ｷﾚｲなお部屋｡",呈刚,현선,Woodside,Woodside,West Village,Shared room,Shared room,10000,1250,629,9.66,58,365,365


In [17]:
# Eh that was ugly!
# To do a summary for specific columns first select them:
# limit_summary = df.select("price","minimum_nights","number_of_reviews","last_review","reviews_per_month","calculated_host_listings_count","availability_365").summary("count","min","max")
limit_summary = df.select("price", "minimum_nights", "number_of_reviews").summary(
    "count", "min", "max"
)
limit_summary.toPandas()

Unnamed: 0,summary,price,minimum_nights,number_of_reviews
0,count,48887,48891,48738
1,min,-74,0,0
2,max,10000,1250,629


### Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).

This is great, but what if we wanted the overall summary metrics like average and counts for more than one variable and without a groupBy variable? We could do this using the pyspark.sql functions library.

In [18]:
# Aggregate!
# agg(*exprs)
# Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).
# available agg functions: min, max, count, countDistinct, approx_count_distinct
# df.agg.(covar_pop(col1, col2)) Returns a new Column for the population covariance of col1 and col2
# df.agg.(covar_samp(col1, col2)) Returns a new Column for the sample covariance of col1 and col2.
# df.agg(corr(col1, col2)) Returns a new Column for the Pearson Correlation Coefficient for col1 and col2.

df.agg(F.min(df.price).alias("Min Price"), F.mean(df.price).alias("Mean Price")).show()

+---------+------------------+
|Min Price|        Mean Price|
+---------+------------------+
|      -74|152.22298361527604|
+---------+------------------+



In [19]:
# There is also this method which is pretty similar
df.select(
    F.countDistinct("neighbourhood_group").alias("CountD"),
    F.avg("price"),
    F.stddev("price"),
).show()

+------+------------------+------------------+
|CountD|        avg(price)|stddev_samp(price)|
+------+------------------+------------------+
|    77|152.22298361527604| 238.5414668883948|
+------+------------------+------------------+



In [20]:
# You could also write the syntax like this....
# But keep in mind with this method that you can only do one variable at a time (bummer)
# Again I don't recommend this!
# Max sales across everything
df.agg({"number_of_reviews": "max"}).withColumnRenamed(
    "max(number_of_reviews)", "Max Reviews"
).show()

+-----------+
|Max Reviews|
+-----------+
|        629|
+-----------+



### Pivot Function

Provides a two way table and must be used in conjunction with groupBy.

In [21]:
# Pivot Function
# pivot(pivot_col, values=None)
df.groupBy("room_type").pivot(
    "neighbourhood_group", ["Queens", "Brooklyn"]
).count().show(10)

+-----------+------+--------+
|  room_type|Queens|Brooklyn|
+-----------+------+--------+
|         51|  null|    null|
|        205|  null|    null|
|         54|  null|    null|
|        200|  null|    null|
|        279|  null|    null|
|        138|  null|    null|
|         69|  null|    null|
|         42|  null|    null|
|Shared room|   198|     413|
|  -73.95777|  null|    null|
+-----------+------+--------+
only showing top 10 rows



In [22]:
# You can also filter your results if you need to
# We some invalid data in the above output
# So we could select only the "Share room" types if we wanted to
df.filter("room_type='Shared room'").groupBy("room_type").pivot(
    "neighbourhood_group", ["Queens", "Brooklyn"]
).count().show(100)

+-----------+------+--------+
|  room_type|Queens|Brooklyn|
+-----------+------+--------+
|Shared room|   198|     413|
+-----------+------+--------+



### Comine all three!

It is also possible to combine all three method into one call: GroupBy, Pivot and Agg like this:

In [23]:
# from pyspark.sql.functions import *
df.groupBy("neighbourhood").pivot("neighbourhood_group", ["Queens", "Brooklyn"]).agg(
    F.min(df.price).alias("Min Price"), F.max(df.price).alias("Max Price")
).toPandas()  # .show()
# Note The toPandas() method should only be used if the resulting Pandas’s DataFrame is expected to be small,
# as all the data is loaded into the driver’s memory.

Unnamed: 0,neighbourhood,Queens_Min Price,Queens_Max Price,Brooklyn_Min Price,Brooklyn_Max Price
0,Corona,23.0,359.0,,
1,Prince's Bay,,,,
2,Richmondtown,,,,
3,Mill Basin,,,85.0,299.0
4,Westerleigh,,,,
...,...,...,...,...,...
378,Morningside Heights,,,,
379,40.69383,,,,
380,Greenpoint,,,0.0,10000.0
381,Elmhurst,15.0,443.0,,
