In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Data Aggregation") \
    .getOrCreate()

25/11/18 16:47:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [18]:
listings = spark.read.csv("data/listings.csv", 
    header=True,
    inferSchema=True,
    sep=",",
    quote='"',
    escape='"',
    multiLine=True,
    mode="PERMISSIVE"
)

                                                                                

In [19]:
for field in listings.schema:
    print(field)

StructField('id', LongType(), True)
StructField('listing_url', StringType(), True)
StructField('scrape_id', LongType(), True)
StructField('last_scraped', DateType(), True)
StructField('source', StringType(), True)
StructField('name', StringType(), True)
StructField('description', StringType(), True)
StructField('neighborhood_overview', StringType(), True)
StructField('picture_url', StringType(), True)
StructField('host_id', IntegerType(), True)
StructField('host_url', StringType(), True)
StructField('host_name', StringType(), True)
StructField('host_since', DateType(), True)
StructField('host_location', StringType(), True)
StructField('host_about', StringType(), True)
StructField('host_response_time', StringType(), True)
StructField('host_response_rate', StringType(), True)
StructField('host_acceptance_rate', StringType(), True)
StructField('host_is_superhost', StringType(), True)
StructField('host_thumbnail_url', StringType(), True)
StructField('host_picture_url', StringType(), True)


In [5]:
listings.show(5)

+-----+--------------------+--------------+------------+-----------+--------------------+--------------------+---------------------+--------------------+-------+--------------------+---------+----------+--------------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+--------------------+----------------------+----------------------------+--------+---------+--------------------+---------------+------------+---------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------+----------------+---------------+---------------+---------------+----------------+---------------------+-------

25/11/18 16:48:46 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [3]:
listings \
  .groupby(listings.property_type) \
  .count() \
  .show(truncate=False)

                                                                                

+----------------------------------+-----+
|property_type                     |count|
+----------------------------------+-----+
|Private room in lighthouse        |2    |
|Private room in loft              |171  |
|Private room in earthen home      |3    |
|Entire chalet                     |4    |
|Earthen home                      |2    |
|Shared room in bus                |1    |
|Farm stay                         |4    |
|Entire rental unit                |39372|
|Shared room in hostel             |59   |
|Shared room                       |4    |
|Private room in condo             |3515 |
|Room in boutique hotel            |239  |
|Private room in religious building|4    |
|Room in bed and breakfast         |24   |
|Private room in casa particular   |86   |
|Entire guesthouse                 |228  |
|Private room in bungalow          |64   |
|Entire cabin                      |39   |
|Hut                               |2    |
|Private room in nature lodge      |2    |
+----------

In [27]:
import pyspark.sql.functions as F

listings \
  .groupby(listings.property_type) \
  .agg(
    F.count('property_type').alias('count')
  ) \
  .orderBy('count', ascending=[False]) \
  .show(truncate=False)

+----------------------------------+-----+
|property_type                     |count|
+----------------------------------+-----+
|Entire rental unit                |39372|
|Private room in rental unit       |14837|
|Private room in home              |11835|
|Entire condo                      |8800 |
|Entire home                       |8756 |
|Private room in condo             |3515 |
|Entire serviced apartment         |1852 |
|Private room in townhouse         |1294 |
|Entire townhouse                  |1123 |
|Room in hotel                     |783  |
|Private room in bed and breakfast |486  |
|Entire loft                       |367  |
|Private room in guesthouse        |316  |
|Room in boutique hotel            |239  |
|Entire guesthouse                 |228  |
|Shared room in rental unit        |191  |
|Entire guest suite                |181  |
|Private room in guest suite       |178  |
|Private room in loft              |171  |
|Private room in serviced apartment|144  |
+----------

In [29]:
# Comprehensive property analysis
property_stats = listings \
  .filter(F.col('price').isNotNull()) \
  .groupby('property_type') \
  .agg(
    F.count('*').alias('total_listings'),
    F.avg('price').alias('avg_price'),
    F.min('price').alias('min_price'),
    F.max('price').alias('max_price'),
    F.avg('review_scores_rating').alias('avg_rating'),
    F.countDistinct('host_id').alias('unique_hosts'),
    F.avg('beds').alias('avg_beds')
  ) \
  .orderBy('total_listings', ascending=False)

property_stats.show()

[Stage 61:>                                                         (0 + 1) / 1]

+--------------------+--------------+---------+---------+---------+------------------+------------+------------------+
|       property_type|total_listings|avg_price|min_price|max_price|        avg_rating|unique_hosts|          avg_beds|
+--------------------+--------------+---------+---------+---------+------------------+------------+------------------+
|  Entire rental unit|         26575|     NULL|$1,000.00|  $999.00| 4.625574589222411|       12729|1.9771391985537812|
|Private room in home|          7810|     NULL|$1,008.00|  $999.00| 4.722693877551016|        4807|1.2273076923076922|
|Private room in r...|          7325|     NULL|$1,000.00|   $99.00|  4.67900228484385|        5153| 1.137316705495409|
|        Entire condo|          6552|     NULL|$1,000.00|  $999.00| 4.725404329004317|        4847| 1.935991445157348|
|         Entire home|          5783|     NULL|$1,000.00|  $999.00| 4.715926470588231|        4343| 3.626904432132964|
|Private room in c...|          2712|     NULL|$

                                                                                

In [30]:
# First, check the extent of null values
print(f"Total rows: {listings.count()}")
print(f"Rows with null price: {listings.filter(F.col('price').isNull()).count()}")
print(f"Rows with non-null price: {listings.filter(F.col('price').isNotNull()).count()}")

# Check nulls by property_type
listings \
  .groupby('property_type') \
  .agg(
    F.count('*').alias('total'),
    F.count('price').alias('non_null_prices'),
    F.sum(F.when(F.col('price').isNull(), 1).otherwise(0)).alias('null_prices')
  ) \
  .show(truncate=False)

Total rows: 96182
Rows with null price: 32977
Rows with non-null price: 63205
+----------------------------------+-----+---------------+-----------+
|property_type                     |total|non_null_prices|null_prices|
+----------------------------------+-----+---------------+-----------+
|Private room in lighthouse        |2    |0              |2          |
|Private room in loft              |171  |84             |87         |
|Private room in earthen home      |3    |1              |2          |
|Entire chalet                     |4    |3              |1          |
|Earthen home                      |2    |2              |0          |
|Shared room in bus                |1    |0              |1          |
|Farm stay                         |4    |4              |0          |
|Entire rental unit                |39372|26575          |12797      |
|Shared room in hostel             |59   |53             |6          |
|Shared room                       |4    |2              |2          |

In [38]:
listings \
        .fillna({'price':0})\
        .groupby('property_type') \
        .agg(F.avg('price').alias('avg_price')) \
        .orderBy('avg_price', ascending=False) \
        .show(truncate=False)
    

+-------------------------+---------+
|property_type            |avg_price|
+-------------------------+---------+
|Private room in home     |0.0      |
|Entire villa             |0.0      |
|Entire place             |0.0      |
|Private room in loft     |0.0      |
|Floor                    |0.0      |
|Entire chalet            |0.0      |
|Camper/RV                |0.0      |
|Shared room in bus       |0.0      |
|Castle                   |0.0      |
|Entire rental unit       |0.0      |
|Private room in boat     |0.0      |
|Shared room              |0.0      |
|Tiny home                |0.0      |
|Room in boutique hotel   |0.0      |
|Entire vacation home     |0.0      |
|Room in bed and breakfast|0.0      |
|Private room in hostel   |0.0      |
|Entire guesthouse        |0.0      |
|Private room in island   |0.0      |
|Entire cabin             |0.0      |
+-------------------------+---------+
only showing top 20 rows



                                                                                

In [39]:
reviews = spark.read.csv("data/reviews.csv", 
    header=True,
    inferSchema=True,
    sep=",",
    quote='"',
    escape='"',
    multiLine=True,
    mode="PERMISSIVE"
)

                                                                                

In [53]:
for index, field in enumerate(reviews.schema):
    print(f"Item: {index+1}, Field: {field.name}, Type: {field.dataType}")

print(len(reviews.schema))

Item: 1, Field: listing_id, Type: LongType()
Item: 2, Field: id, Type: LongType()
Item: 3, Field: date, Type: DateType()
Item: 4, Field: reviewer_id, Type: IntegerType()
Item: 5, Field: reviewer_name, Type: StringType()
Item: 6, Field: comments, Type: StringType()
6


In [42]:
print(reviews.schema)

StructType([StructField('listing_id', LongType(), True), StructField('id', LongType(), True), StructField('date', DateType(), True), StructField('reviewer_id', IntegerType(), True), StructField('reviewer_name', StringType(), True), StructField('comments', StringType(), True)])


In [54]:
for index, field in enumerate(listings.schema):
    print(f"Item: {index+1}, Field: {field.name}, Type: {field.dataType}")

print(len(listings.schema))

Item: 1, Field: id, Type: LongType()
Item: 2, Field: listing_url, Type: StringType()
Item: 3, Field: scrape_id, Type: LongType()
Item: 4, Field: last_scraped, Type: DateType()
Item: 5, Field: source, Type: StringType()
Item: 6, Field: name, Type: StringType()
Item: 7, Field: description, Type: StringType()
Item: 8, Field: neighborhood_overview, Type: StringType()
Item: 9, Field: picture_url, Type: StringType()
Item: 10, Field: host_id, Type: IntegerType()
Item: 11, Field: host_url, Type: StringType()
Item: 12, Field: host_name, Type: StringType()
Item: 13, Field: host_since, Type: DateType()
Item: 14, Field: host_location, Type: StringType()
Item: 15, Field: host_about, Type: StringType()
Item: 16, Field: host_response_time, Type: StringType()
Item: 17, Field: host_response_rate, Type: StringType()
Item: 18, Field: host_acceptance_rate, Type: StringType()
Item: 19, Field: host_is_superhost, Type: StringType()
Item: 20, Field: host_thumbnail_url, Type: StringType()
Item: 21, Field: host

In [55]:
listings_reviews = listings.join(
            reviews, listings.id == reviews.listing_id, how = 'inner'
)

In [70]:
listings_reviews = listings.join(
    reviews, 
    listings['id'] == reviews['listing_id']
)

listings_reviews \
  .groupBy(listings['id']) \
  .agg(
    F.count(listings['id']).alias('num_reviews')
  ) \
  .show()

[Stage 89:>                                                         (0 + 1) / 1]

+-------+-----------+
|     id|num_reviews|
+-------+-----------+
|  78606|          2|
| 444886|         12|
| 466017|         28|
|2162431|          1|
|2557853|         80|
|3132302|          3|
|2736493|          4|
|3734796|          5|
|3917692|          1|
|3997029|          7|
|4361078|         65|
|5520243|          5|
|5355817|         18|
|5921026|         40|
|6311069|          3|
|6552071|          1|
|6606418|        235|
|6651481|         48|
|7188835|        207|
|7709953|         33|
+-------+-----------+
only showing top 20 rows



                                                                                

In [71]:
reviews_per_listing = listings_reviews \
  .groupBy(listings.id, listings.name) \
  .agg(
    F.count(reviews.id).alias('num_reviews')
  ) \
  .orderBy('num_reviews', ascending=False) \
  .show(truncate=False)

[Stage 93:>                                                         (0 + 1) / 1]

+--------+--------------------------------------------------+-----------+
|id      |name                                              |num_reviews|
+--------+--------------------------------------------------+-----------+
|30760930|Double Garden View room - London House Hotel***   |1682       |
|47408549|Double Room+ Ensuite                              |1650       |
|43120947|Private double room with en suite facilities      |1553       |
|19670926|Designer Studio Apartment in Central London       |1382       |
|1436172 |Cosy Double in Kings Cross Houseshare nr Eurostar |1134       |
|45006692|Budget Double Room In Colliers Hotel.             |1132       |
|2126708 |London's best transport hub 5 mins walk! Safe too!|1071       |
|1436177 |En-suite Double in Kings Cross Houseshare Eurostar|943        |
|3855375 |Double in Kings Cross Houseshare nr Eurostar      |935        |
|2659707 |Large Room + Private Bathroom, E3.                |893        |
|42081759|Micro Studio at Locke at Bro

                                                                                