# Data Aggregation in PySpark

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Data Aggregation").getOrCreate()

25/07/15 20:58:45 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
listings = spark.read.csv(
    "data/listings.csv.gz", 
    header=True, 
    inferSchema=True, 
    sep=",", 
    quote='"', 
    escape='"', 
    multiLine=True,
    mode="PERMISSIVE")

                                                                                

In [3]:
listings.groupby(listings.property_type).count().show(truncate=False)

[Stage 2:>                                                          (0 + 1) / 1]

+----------------------------------+-----+
|property_type                     |count|
+----------------------------------+-----+
|Private room in lighthouse        |2    |
|Private room in loft              |154  |
|Private room in earthen home      |2    |
|Entire chalet                     |5    |
|Earthen home                      |1    |
|Farm stay                         |4    |
|Entire rental unit                |40799|
|Shared room in hostel             |61   |
|Shared room                       |1    |
|Private room in condo             |3255 |
|Room in boutique hotel            |229  |
|Private room in religious building|4    |
|Room in bed and breakfast         |18   |
|Private room in casa particular   |62   |
|Private room in bungalow          |63   |
|Entire cabin                      |50   |
|Entire guesthouse                 |226  |
|Hut                               |4    |
|Private room in nature lodge      |4    |
|Entire guest suite                |175  |
+----------

                                                                                

In [4]:
# Using .agg
import pyspark.sql.functions as F

listings \
    .groupby(listings.property_type) \
    .agg(
        F.count('property_type').alias('count')
    ) \
    .orderBy('count', ascending=False) \
    .show(truncate=False)

[Stage 5:>                                                          (0 + 1) / 1]

+----------------------------------+-----+
|property_type                     |count|
+----------------------------------+-----+
|Entire rental unit                |40799|
|Private room in rental unit       |14573|
|Private room in home              |11826|
|Entire home                       |8938 |
|Entire condo                      |8438 |
|Private room in condo             |3255 |
|Entire serviced apartment         |1840 |
|Private room in townhouse         |1189 |
|Entire townhouse                  |1069 |
|Room in hotel                     |1042 |
|Private room in bed and breakfast |495  |
|Private room in guesthouse        |371  |
|Entire loft                       |339  |
|Room in boutique hotel            |229  |
|Entire guesthouse                 |226  |
|Entire guest suite                |175  |
|Private room in guest suite       |170  |
|Private room in loft              |154  |
|Private room in serviced apartment|154  |
|Private room                      |103  |
+----------

                                                                                

In [5]:
reviews = spark.read.csv(
    "data/reviews.csv.gz", 
    header=True, 
    inferSchema=True, 
    sep=",", 
    quote='"', 
    escape='"', 
    multiLine=True, 
    mode="PERMISSIVE")

                                                                                

In [6]:
for field in reviews.schema:
    print(field)

StructField('listing_id', LongType(), True)
StructField('id', LongType(), True)
StructField('date', DateType(), True)
StructField('reviewer_id', IntegerType(), True)
StructField('reviewer_name', StringType(), True)
StructField('comments', StringType(), True)


In [7]:
listings_reviews = listings.join(
    reviews, 
    listings.id == reviews.listing_id, 
    how="inner"
)

In [8]:
reviews_per_listing = listings_reviews \
    .groupby(listings.id, listings.name) \
    .agg(
        F.count(reviews.id).alias('num_reviews')
    ) \
    .orderBy('num_reviews', ascending=False) \
    .show(truncate=False)

[Stage 11:>                                                         (0 + 1) / 1]

+--------+--------------------------------------------------+-----------+
|id      |name                                              |num_reviews|
+--------+--------------------------------------------------+-----------+
|47408549|Double Room+ Ensuite                              |1855       |
|30760930|Double Garden View room - London House Hotel***   |1682       |
|43120947|Private double room with en suite facilities      |1615       |
|19670926|Locke Studio Apartment at Leman Locke             |1436       |
|45006692|Budget Double Room In Colliers Hotel.             |1433       |
|1436172 |Cosy Double in Kings Cross Houseshare nr Eurostar |1195       |
|2126708 |London's best transport hub 5 mins walk! Safe too!|1122       |
|1436177 |En-suite Double in Kings Cross Houseshare Eurostar|1005       |
|47438714|KX Basic- Small Double- shared bathroom           |978        |
|3855375 |Double in Kings Cross Houseshare nr Eurostar      |973        |
|46233904|Superior Studio, avg size 23

                                                                                

## Exercise

In [10]:
# Count the number of reviews per listing using the reviews dataset
reviews_per_listing = reviews \
    .groupby(reviews.listing_id) \
    .count() \
    .show(10)

[Stage 17:>                                                         (0 + 1) / 1]

+----------+-----+
|listing_id|count|
+----------+-----+
|     78606|    2|
|    444886|   12|
|    466017|   28|
|   2736493|    4|
|   2557853|   89|
|   3132302|    3|
|   3917692|    1|
|   3734796|    5|
|   3997029|    7|
|   4361078|   70|
+----------+-----+
only showing top 10 rows


                                                                                

In [11]:
# 2. Compute the total number of listings and average review score per host
from pyspark.sql.functions import avg, count

host_stats = listings \
    .filter(listings.review_scores_rating.isNotNull()) \
    .groupBy('host_id') \
    .agg(
        count('id').alias('total_listings'), 
        avg('review_scores_rating').alias('average_review_score')
    ) \
    .show(10)

[Stage 20:>                                                         (0 + 1) / 1]

+--------+--------------+--------------------+
| host_id|total_listings|average_review_score|
+--------+--------------+--------------------+
| 2358441|             1|                4.86|
| 2876123|             2|  4.9399999999999995|
| 2038199|             1|                 5.0|
| 4157822|             2|               4.925|
|  719504|             1|                4.96|
| 7950720|             1|                4.86|
| 6572018|             1|                 5.0|
|12122942|             1|                4.93|
|13851928|             1|                4.97|
|13739634|             2|                4.74|
+--------+--------------+--------------------+
only showing top 10 rows


                                                                                

In [14]:
# 3: Find the top ten listings with the highest number of reviews
reviews \
    .groupBy('listing_id') \
    .count() \
    .orderBy('count', ascending=False) \
    .limit(5) \
    .show()

[Stage 26:>                                                         (0 + 1) / 1]

+----------+-----+
|listing_id|count|
+----------+-----+
|  47408549| 1855|
|  30760930| 1682|
|  43120947| 1615|
|  19670926| 1436|
|  45006692| 1433|
+----------+-----+



                                                                                

In [15]:
# 5. Get a data frame with the folloing four columns:
# * Listing's ID
# * Listing's name
# * Reviewer's name
# * Review's comment

listings.join(reviews, listings.id == reviews.listing_id, 'inner') \
    .select(listings.id, 'name', 'reviewer_name', 'comments') \
    .show(truncate=False)

                                                                                

+-----+-----------------------------------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id   |name                               |reviewer_name|comments   

In [16]:
# 6. Get Top Five Listings with the highest average review comment length.
# Only return listings with at at least 5 reviews

from pyspark.sql.functions import length, avg, count

reviews_with_comment_length = reviews.withColumn('comment_length', length('comments'))
reviews_with_comment_length \
    .join(listings, reviews_with_comment_length.listing_id == listings.id, 'inner') \
    .groupBy('listing_id').agg(
        avg(reviews_with_comment_length.comment_length).alias('average_comment_length'), 
        count(reviews_with_comment_length.id).alias('reviews_count')
    ) \
    .filter('reviews_count >= 5') \
    .orderBy('average_comment_length', ascending=False) \
    .show()

[Stage 32:>                                                         (0 + 1) / 1]

+------------------+----------------------+-------------+
|        listing_id|average_comment_length|reviews_count|
+------------------+----------------------+-------------+
|618608352812465378|    1300.1666666666667|            6|
|          28508447|    1089.3333333333333|            6|
|627425975703032358|     951.7777777777778|            9|
|           2197681|                 939.2|            5|
|          13891813|                 905.0|            5|
|            979753|     893.9230769230769|           13|
|630150178279666225|     890.7272727272727|           11|
|           8856894|     890.1666666666666|            6|
|          29469389|                 885.0|            6|
|          22524075|                 885.0|            5|
|           5555679|     878.7169811320755|          106|
|          33385444|                 848.0|            5|
|            565214|     834.0833333333334|           12|
|          53493254|                 831.0|            7|
|          126

                                                                                

In [18]:
# 7. Using "join" operator find listings without reviews

joined_df = listings.join(
    reviews, 
    listings.id == reviews.listing_id, 
    how='left_outer'
)

joined_df \
    .filter(reviews.id.isNull()) \
    .select('name') \
    .show(truncate=False)

[Stage 36:>                                                         (0 + 1) / 1]

+------------------------------------------------+
|name                                            |
+------------------------------------------------+
|ChiqDoube Room in PrivateAppartment             |
|ROOM TO RENT IN THE OLYMPIC PERIOD              |
|4 bed Beautiful west london home                |
|London, Hoxton. Nice, 2 bedroom, 7th floor flat.|
|Bright Dbl/Nr/ Excellnt Transp                  |
|Stunning Shared Penthouse Apartment             |
|The Old Coach House (Olympics)                  |
|Well furnished room (Olympics site)             |
|Studio 20min Walk from Olympic City             |
|Luxury single room                              |
|Contemporary house London E4                    |
|A lovely one bedroom garden flat!!              |
|Coming to London for the Olympics?              |
|Lovely double room + own bathroom               |
|Double Room close to Olympic Park!              |
|Double bedroom near Olympic Park                |
|Gorgeous characterful flat-Oly

                                                                                