 # Aggregating DataFrames in PySpark

 In this lecture we will be going over how to aggregate dataframes in Pyspark. 
 The commands we will learn here will be super useful for doing quality checks 
 on your dataframes and answering more simiplistic business questions with you data. 
 
 So let's get to it! Here is what we will cover today:
 
  - GroupBy
  - Pivot
  - Aggregate methods
  - Combos of each

In [16]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("aggregate").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark


You are working with 1 core(s)


In [17]:
airbnb = spark.read.csv('Datasets/nyc_air_bnb.csv',inferSchema=True,header=True)


In [18]:
airbnb.toPandas()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.21,6,365.0
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.38,2,355.0
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365.0
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.64,1,194.0
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.10,1,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
49074,36484665,Charming one bedroom - newly renovated rowhouse,8232441,Sabrina,Brooklyn,Bedford-Stuyvesant,40.67853,-73.94995,Private room,70,2,0,,,2,9.0
49075,36485057,Affordable room in Bushwick/East Williamsburg,6570630,Marisol,Brooklyn,Bushwick,40.70184,-73.93317,Private room,40,4,0,,,2,36.0
49076,36485431,Sunny Studio at Historical Neighborhood,23492952,Ilgar & Aysel,Manhattan,Harlem,40.81475,-73.94867,Entire home/apt,115,10,0,,,1,27.0
49077,36485609,43rd St. Time Square-cozy single bed,30985759,Taz,Manhattan,Hell's Kitchen,40.75751,-73.99112,Shared room,55,1,0,,,6,2.0


In [19]:
airbnb.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: integer (nullable = true)



In [20]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df = airbnb.withColumn("price", airbnb["price"].cast(IntegerType()))         .withColumn("minimum_nights", airbnb["minimum_nights"].cast(IntegerType()))         .withColumn("number_of_reviews", airbnb["number_of_reviews"].cast(IntegerType()))         .withColumn("reviews_per_month", airbnb["reviews_per_month"].cast(IntegerType()))         .withColumn("calculated_host_listings_count", airbnb["calculated_host_listings_count"].cast(IntegerType()))
#QA
print(df.printSchema())
df.limit(5).toPandas()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)

None


Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.0,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.0,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.0,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.0,1,0


# # GroupBy and Aggregate Functions
 
These two commands go hand in hand many times in PySpark. ACtually in order to use the GroupBy command, you have to also tell Spark what numeric aggregate you want to learn about. For example, count, average or min/max. 
 
 GroupBy allows you to group rows together based off some column value, for example, you could group together sales data by the day the sale occured, or group repeat customer data based off the name of the customer. Once you've performed the GroupBy operation you can use an aggregate function off that data. An aggregate function aggregates multiple rows of data into a single output, such as taking the sum of inputs, or counting the number of inputs.


In [21]:
df.groupBy('neighbourhood_group').count().show()

+-------------------+-----+
|neighbourhood_group|count|
+-------------------+-----+
|         Douglaston|    1|
|             Queens| 5630|
|              Nadia|    1|
|            Midtown|    4|
|    Jackson Heights|    2|
|     Hell's Kitchen|    7|
|  Greenwich Village|    2|
|       Clinton Hill|    1|
| Washington Heights|    4|
|   Ditmars Steinway|    3|
|           Longwood|    2|
|          Briarwood|    1|
|        Little Neck|    1|
|           Flushing|    3|
|      Randall Manor|    1|
|             Carmen|    1|
|      East Elmhurst|    2|
|    Upper East Side|    7|
|               null|  185|
|         Bath Beach|    1|
+-------------------+-----+
only showing top 20 rows



In [22]:
df.groupBy('neighbourhood_group').mean('price').show()

+-------------------+------------------+
|neighbourhood_group|        avg(price)|
+-------------------+------------------+
|         Douglaston|               1.0|
|             Queens| 99.57690941385435|
|              Nadia|              null|
|            Midtown|               9.0|
|    Jackson Heights|              16.0|
|     Hell's Kitchen|1.2857142857142858|
|  Greenwich Village|              55.5|
|       Clinton Hill|              14.0|
| Washington Heights|              2.75|
|   Ditmars Steinway|3.3333333333333335|
|           Longwood|               5.0|
|          Briarwood|               1.0|
|        Little Neck|               1.0|
|           Flushing|10.333333333333334|
|      Randall Manor|               7.0|
|             Carmen|              null|
|      East Elmhurst|               1.0|
|    Upper East Side|1.5714285714285714|
|               null|              null|
|         Bath Beach|               2.0|
+-------------------+------------------+
only showing top

In [23]:
from pyspark.sql.functions import *
df.groupBy("neighbourhood").agg(min(df.price).alias("Min Price"),max(df.price).alias("Max Price")).show(5)



+-------------+---------+---------+
|neighbourhood|Min Price|Max Price|
+-------------+---------+---------+
|       Corona|       23|      359|
| Richmondtown|       78|       78|
| Prince's Bay|       85|     1250|
|  Westerleigh|       40|      103|
|   Mill Basin|       85|      299|
+-------------+---------+---------+
only showing top 5 rows



In [24]:
# This is also a pretty neat function you can use:
limit_summary = df.select("price","minimum_nights","number_of_reviews").summary("count","min", "25%", "75%","max")
limit_summary.toPandas()

Unnamed: 0,summary,price,minimum_nights,number_of_reviews
0,count,48887,48891,48738
1,min,-74,0,0
2,25%,69,1,1
3,75%,175,5,23
4,max,10000,1250,629


# pivot 
changes the row to colum 
for ex
+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+


 df.groupBy("Product").pivot("Country").sum("Amount")

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+

In [25]:
df.groupBy("neighbourhood").pivot("neighbourhood_group", ["Queens", "Brooklyn"]).agg(min(df.price).alias("Min Price"),max(df.price).alias("Max Price")).toPandas()#.show()

Unnamed: 0,neighbourhood,Queens_Min Price,Queens_Max Price,Brooklyn_Min Price,Brooklyn_Max Price
0,Corona,23.0,359.0,,
1,Prince's Bay,,,,
2,Richmondtown,,,,
3,Mill Basin,,,85.0,299.0
4,Westerleigh,,,,
...,...,...,...,...,...
378,40.69383,,,,
379,Morningside Heights,,,,
380,Greenpoint,,,0.0,10000.0
381,Elmhurst,15.0,443.0,,
