## Aggregating Functions with Pyspark

### Import Pyspark and create SparkSession.

This is the first thing to do when working with pyspark. The spark variable will also provide access to a UI to monitor jobs.

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Aggregating").getOrCreate()

In [3]:
path = 'Datasets/'

### Let's read some csv and get down to it

In [4]:
airbnb = spark.read.csv(path+"nyc_air_bnb.csv",header=True,inferSchema=True)

In [5]:
airbnb.limit(5).toPandas()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.21,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.38,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.64,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.1,1,0


In [7]:
airbnb.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: integer (nullable = true)



The inferSchema just not did that good of a job. Let's change some datatypes

In [6]:
from pyspark.sql.types import *
df = airbnb.withColumn("price", airbnb["price"].cast(IntegerType())) \
        .withColumn("minimum_nights", airbnb["minimum_nights"].cast(IntegerType())) \
        .withColumn("number_of_reviews", airbnb["number_of_reviews"].cast(IntegerType())) \
        .withColumn("reviews_per_month", airbnb["reviews_per_month"].cast(IntegerType())) \
        .withColumn("calculated_host_listings_count", airbnb["calculated_host_listings_count"].cast(IntegerType()))
#QA
print(df.printSchema())
df.limit(5).toPandas()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)

None


Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.0,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.0,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.0,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.0,1,0


### Grouping and Aggregating

In [7]:
from pyspark.sql.functions import *

#### Group by neighbourhood_group and count the amount of data for each

In [8]:
df.groupBy("neighbourhood_group").count().show(7)

+-------------------+-----+
|neighbourhood_group|count|
+-------------------+-----+
|         Douglaston|    1|
|             Queens| 5630|
|              Nadia|    1|
|            Midtown|    4|
|    Jackson Heights|    2|
|     Hell's Kitchen|    7|
|  Greenwich Village|    2|
+-------------------+-----+
only showing top 7 rows



#### Group by neighbourhood_group and get min price for each group

In [9]:
df.groupBy("neighbourhood_group").min("price").show(7)

+-------------------+----------+
|neighbourhood_group|min(price)|
+-------------------+----------+
|         Douglaston|         1|
|             Queens|        10|
|              Nadia|      null|
|            Midtown|         2|
|    Jackson Heights|         2|
|     Hell's Kitchen|         1|
|  Greenwich Village|        31|
+-------------------+----------+
only showing top 7 rows



#### Group by neighbourhood and get mean of price for each group

In [10]:
df.groupBy("neighbourhood").agg({'price':'mean'}).show()

+-----------------+------------------+
|    neighbourhood|        avg(price)|
+-----------------+------------------+
|           Corona|         59.171875|
|     Richmondtown|              78.0|
|     Prince's Bay|             409.5|
|      Westerleigh|              71.5|
|       Mill Basin|            179.75|
|         40.76199|               1.0|
|     Civic Center|191.94230769230768|
|         40.83166|               1.0|
|       Douglaston| 88.14285714285714|
|       Mount Hope|              77.5|
|          40.7578|               1.0|
|         40.80958|               1.0|
|      Marble Hill| 89.16666666666667|
|        Rego Park| 83.87735849056604|
|         40.81225|               2.0|
|         40.76805|               1.0|
|         40.64936|               1.0|
|    Dyker Heights| 93.41666666666667|
|         40.76364|               2.0|
|Kew Gardens Hills| 112.3076923076923|
+-----------------+------------------+
only showing top 20 rows



#### Group by neighbourhood and min and max price for each group

In [11]:
df.groupBy("neighbourhood").agg(min(df.price),max(df.price)).show()

+-----------------+----------+----------+
|    neighbourhood|min(price)|max(price)|
+-----------------+----------+----------+
|           Corona|        23|       359|
|     Richmondtown|        78|        78|
|     Prince's Bay|        85|      1250|
|      Westerleigh|        40|       103|
|       Mill Basin|        85|       299|
|         40.76199|         1|         1|
|     Civic Center|        50|       950|
|         40.83166|         1|         1|
|       Douglaston|        40|       178|
|       Mount Hope|        24|       250|
|          40.7578|         1|         1|
|         40.80958|         1|         1|
|      Marble Hill|        40|       274|
|        Rego Park|        21|       300|
|         40.81225|         2|         2|
|         40.76805|         1|         1|
|         40.64936|         1|         1|
|    Dyker Heights|        30|       170|
|         40.76364|         2|         2|
|Kew Gardens Hills|        40|       399|
+-----------------+----------+----

####  Getting some statistics from the data like

In [13]:
summary = df.summary("count","min","max","25%","75%")
summary.limit(4).toPandas()

''' Notice how the string columns are not affected'''

' Notice how the string columns are not affected'

In [14]:
limit_summary = df.select("price","minimum_nights","number_of_reviews").summary("count","min","max","25%","75%")

In [15]:
limit_summary.limit(6).toPandas()

Unnamed: 0,summary,price,minimum_nights,number_of_reviews
0,count,48887,48891,48738
1,min,-74,0,0
2,max,10000,1250,629
3,25%,69,1,1
4,75%,175,5,23


#### Well we want to count the number of distinct neighbourhood groups  and get average and standard deviation of price

In [16]:
df.select(countDistinct("neighbourhood_group"),avg("price"),stddev("price")).limit(5).toPandas()

Unnamed: 0,count(DISTINCT neighbourhood_group),avg(price),stddev_samp(price)
0,77,152.222984,238.541467


#### The Pivot table to understand what room types are there in neighbourhood groups of Queens and Brooklyn

In [17]:
df.groupBy("room_type").pivot("neighbourhood_group",['Queens','Brooklyn']).count().show(10)

+-----------+------+--------+
|  room_type|Queens|Brooklyn|
+-----------+------+--------+
|         51|  null|    null|
|        205|  null|    null|
|         54|  null|    null|
|        200|  null|    null|
|        279|  null|    null|
|        138|  null|    null|
|         69|  null|    null|
|         42|  null|    null|
|Shared room|   198|     413|
|  -73.95777|  null|    null|
+-----------+------+--------+
only showing top 10 rows



#### Now we are really getting into it with filtering data based on room type, grouping it and having a pivot table to understand the relationship for that particular room type with the neighbourhood groupds of Queens and Brooklyn

In [18]:
df.filter("room_type='Shared room'").groupBy("room_type").pivot("neighbourhood_group",['Queens','Brooklyn']).count().show(10)

+-----------+------+--------+
|  room_type|Queens|Brooklyn|
+-----------+------+--------+
|Shared room|   198|     413|
+-----------+------+--------+



In [19]:
df.filter("room_type='Shared room'").groupBy("room_type").pivot("neighbourhood_group",['Queens','Brooklyn']).agg(min(df.price),max(df.price)).toPandas()

Unnamed: 0,room_type,Queens_min(price),Queens_max(price),Brooklyn_min(price),Brooklyn_max(price)
0,Shared room,11,1800,0,725
