In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark aggregation functions") \
    .getOrCreate()

26/01/31 17:47:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
listings = spark.read.csv("data/listings.csv.gz", 
    header=True,
    inferSchema=True,
    sep=",", 
    quote='"',
    escape='"', 
    multiLine=True,
    mode="PERMISSIVE" 
)
listings.printSchema()

                                                                                

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: date (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: int

In [4]:
reviews = spark.read.csv("data/reviews.csv.gz", 
    header=True,
    inferSchema=True,
    sep=",",
    quote='"',
    escape='"',
    multiLine=True,
    mode="PERMISSIVE"
)
reviews.printSchema()

[Stage 3:>                                                          (0 + 1) / 1]

root
 |-- listing_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



                                                                                

In [5]:
reviews_per_listing = reviews \
    .groupBy('listing_id') \
    .count() \
    .show(10)

[Stage 4:>                                                          (0 + 1) / 1]

+----------+-----+
|listing_id|count|
+----------+-----+
|   5584511|    2|
|   3899992|   63|
|   5625937|  254|
|   5752378|   41|
|   5801078|    1|
|   8627034|  112|
|   8910312|    4|
|  13949987|   11|
|  14326586|   45|
|  14764889|    8|
+----------+-----+
only showing top 10 rows


                                                                                

In [9]:
from pyspark.sql.functions import avg, count

host_stats = listings \
    .filter(listings.review_scores_rating.isNotNull()) \
    .groupBy('host_id') \
    .agg(
        count('id').alias('total_listings'),
        avg('review_scores_rating').alias('average_review_scores')
    ) \
    .show(10)

+---------+--------------+---------------------+
|  host_id|total_listings|average_review_scores|
+---------+--------------+---------------------+
|  7561201|             1|                 4.59|
|  2247257|            11|    4.748181818181819|
| 27783798|             1|                 4.87|
| 31401755|             1|                 4.48|
|101699402|             1|                 4.71|
| 26620734|             2|                 4.85|
| 11501613|             2|                4.855|
| 52963821|             1|                 4.97|
| 87666772|             7|    4.714285714285714|
|182690485|             1|                  5.0|
+---------+--------------+---------------------+
only showing top 10 rows


                                                                                

In [14]:
top_ten = reviews \
    .groupBy('listing_id') \
    .count() \
    .orderBy('count', ascending=False) \
    .show(10)

[Stage 10:>                                                         (0 + 1) / 1]

+----------+-----+
|listing_id|count|
+----------+-----+
|    833602| 1395|
|   5098399| 1143|
|   1482235| 1132|
|  15961311| 1123|
|   2701066| 1118|
|   6271080| 1093|
|  20761132| 1065|
|   5257587| 1041|
|  19221782| 1011|
|    768969|  994|
+----------+-----+
only showing top 10 rows


                                                                                

In [18]:
top_five = listings \
    .groupBy('neighbourhood_cleansed') \
    .count() \
    .orderBy('count', ascending=False) \
    .show(5, truncate=False)

+----------------------+-----+
|neighbourhood_cleansed|count|
+----------------------+-----+
|BUENOS AIRES - VENEZIA|1862 |
|DUOMO                 |1626 |
|CENTRALE              |1004 |
|SARPI                 |1004 |
|LORETO                |924  |
+----------------------+-----+
only showing top 5 rows


In [20]:
listings.join(reviews, listings.id == reviews.listing_id, 'inner') \
    .select(listings.id, 'name', 'reviewer_name', 'comments')\
    .show(5, truncate=False)

+-------+----------------------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id     |name                        |reviewer_name|comments                                                                                                                                                                                                                                                                                                                                                                                                                                  

In [22]:
top_five = listings \
    .groupBy('host_id') \
    .count() \
    .orderBy('count', ascending=False) \
    .show(5, truncate=False)

+--------+-----+
|host_id |count|
+--------+-----+
|27693585|481  |
|9025189 |360  |
|16366171|330  |
|2504885 |231  |
|54050937|155  |
+--------+-----+
only showing top 5 rows


In [23]:
from pyspark.sql.functions import length, avg, count

reviews_with_comment_length = reviews.withColumn('comment_length', length('comments'))
reviews_with_comment_length \
  .join(listings, reviews_with_comment_length.listing_id == listings.id, 'inner') \
  .groupBy('listing_id').agg(
      avg(reviews_with_comment_length.comment_length).alias('average_comment_length'),
      count(reviews_with_comment_length.id).alias('reviews_count')
  ) \
  .filter('reviews_count >= 5') \
  .orderBy('average_comment_length', ascending=False) \
  .show()

[Stage 27:>                                                         (0 + 1) / 1]

+-------------------+----------------------+-------------+
|         listing_id|average_comment_length|reviews_count|
+-------------------+----------------------+-------------+
|           39085815|     861.0769230769231|           13|
|           35367956|                 794.4|            5|
|1135330651527883109|                 715.4|            5|
|            2475886|     669.1428571428571|            7|
|            2588594|                 637.8|            5|
|           52399037|     636.1428571428571|            7|
| 869564018040207870|                 635.8|            5|
| 647799685772080521|     634.0909090909091|           11|
| 904514358972558604|                 611.0|            5|
|            6023631|                 608.2|            5|
|             303469|                 585.0|           18|
|1087889188768301827|     584.1428571428571|            7|
|           12230260|     574.2857142857143|            7|
| 974396287862479838|                 571.8|            

                                                                                

In [24]:
joined_df = listings.join(
    reviews,
    listings.id == reviews.listing_id,
    how='left_outer'
)

joined_df \
  .filter(reviews.listing_id.isNull()) \
  .select('name') \
  .show(truncate=False)

[Stage 31:>                                                         (0 + 1) / 1]

+--------------------------------------------------+
|name                                              |
+--------------------------------------------------+
|Camera doppia per studentesse Bocconi             |
|CHARMING APT CLOSE TO BOCCONI-IED-IEO-NABA DUOMO  |
|Brera Design District Flat - One Bedroom          |
|BRIGHT&WIDE APARTMENT ON THE CANAL                |
|Appartamento Milano Sarca 322                     |
|Camera singola con giardino                       |
|Rent cute loft in Milan                           |
|Cozy Loft for rent Milano-Isola                   |
|Double Apt in Moscova/Brera -Historic Center Milan|
|Vivi nel cuore di Zona Tortona!!!                 |
|Camera singola Milano.Chiedere prima disponibilità|
|Brand new apartment in the center of the Navigli  |
|incantevole mansardina in centro mi               |
|20 days furnished apartment with terrace          |
|Navigli Silent cozy pvt room - charming apt       |
|Charming flat navigli/solari area            

                                                                                

In [25]:
listings_without_reviews = listings \
  .join(
    reviews,
    listings.id == reviews.listing_id,
    how='left_anti'
  ) \
  .select('name') \
  .show(truncate=False)

[Stage 36:>                                                         (0 + 1) / 1]

+--------------------------------------------------+
|name                                              |
+--------------------------------------------------+
|Camera doppia per studentesse Bocconi             |
|CHARMING APT CLOSE TO BOCCONI-IED-IEO-NABA DUOMO  |
|Brera Design District Flat - One Bedroom          |
|BRIGHT&WIDE APARTMENT ON THE CANAL                |
|Appartamento Milano Sarca 322                     |
|Camera singola con giardino                       |
|Rent cute loft in Milan                           |
|Cozy Loft for rent Milano-Isola                   |
|Double Apt in Moscova/Brera -Historic Center Milan|
|Vivi nel cuore di Zona Tortona!!!                 |
|Camera singola Milano.Chiedere prima disponibilità|
|Brand new apartment in the center of the Navigli  |
|incantevole mansardina in centro mi               |
|20 days furnished apartment with terrace          |
|Navigli Silent cozy pvt room - charming apt       |
|Charming flat navigli/solari area            

                                                                                