In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark aggregation functions") \
    .getOrCreate()

In [4]:
listings = spark.read.csv("data/listings.csv", 
    header=True,
    inferSchema=True,
    sep=",", 
    quote='"',
    escape='"', 
    multiLine=True,
    mode="PERMISSIVE" 
)
listings.printSchema()

                                                                                

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: date (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: int

In [5]:
reviews = spark.read.csv("data/reviews.csv", 
    header=True,
    inferSchema=True,
    sep=",",
    quote='"',
    escape='"',
    multiLine=True,
    mode="PERMISSIVE"
)
reviews.printSchema()

[Stage 3:>                                                          (0 + 1) / 1]

root
 |-- listing_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



                                                                                

In [6]:
# 1. For each listing compute string category depending on its price, and add it as a new column.
# A category is defined in the following way:
#
# * price < 50 -> "Budget"
# * 50 <= price < 150 -> "Mid-range"
# * price >= 150 -> "Luxury"
# 
# Only include listings where the price is not null.
# Count the number of listings in each category

from pyspark.sql.functions import col, regexp_replace, udf
from pyspark.sql.types import StringType

# Price -> numeric
listings_num = listings.withColumn(
    "price_numeric",
    regexp_replace(col("price"), "[$,]", "").cast("double")
)

# UDF for price category
def price_category(p):
    if p is None:
        return None
    if p < 50:
        return "Budget"
    elif p < 150:
        return "Mid-range"
    else:
        return "Luxury"

price_category_udf = udf(price_category, StringType())

# Filter out null prices and apply the UDF
with_category = (
    listings_num
    .filter(col("price_numeric").isNotNull())
    .withColumn("price_category", price_category_udf(col("price_numeric")))
)

# Count number of listings per category (optional order)
from pyspark.sql.functions import when as sf_when

category_counts = (
    with_category.groupBy("price_category").count()
    .withColumn(
        "order_key",
        sf_when(col("price_category") == "Budget", 0)
        .when(col("price_category") == "Mid-range", 1)
        .otherwise(2)
    )
    .orderBy("order_key")
    .drop("order_key")
)

# Quick check
with_category.select("id", "price", "price_numeric", "price_category").show(10, truncate=False)
category_counts.show(truncate=False)

                                                                                

+------+-------+-------------+--------------+
|id    |price  |price_numeric|price_category|
+------+-------+-------------+--------------+
|264776|$297.00|297.0        |Luxury        |
|264777|$98.00 |98.0         |Mid-range     |
|264778|$148.00|148.0        |Mid-range     |
|264779|$144.00|144.0        |Mid-range     |
|264780|$157.00|157.0        |Luxury        |
|264781|$148.00|148.0        |Mid-range     |
|264782|$120.00|120.0        |Mid-range     |
|264783|$216.00|216.0        |Luxury        |
|264789|$238.00|238.0        |Luxury        |
|266037|$62.00 |62.0         |Mid-range     |
+------+-------+-------------+--------------+
only showing top 10 rows



[Stage 5:>                                                          (0 + 1) / 1]

+--------------+-----+
|price_category|count|
+--------------+-----+
|Budget        |6612 |
|Mid-range     |28108|
|Luxury        |27964|
+--------------+-----+



                                                                                

In [7]:
# 2. In this task you will need to compute a santiment score per review, and then an average sentiment score per listing.
# A santiment score indicates how "positive" or "negative" a review is. The higher the score the more positive it is, and vice-versa.
#
# To compute a sentiment score per review compute the number of positive words in a review and subtract the number of negative
# words in the same review (the list of words is already provided)
#
# To complete this task, compute a DataFrame that contains the following fields:
# * name - the name of a listing
# * average_sentiment - average sentiment of reviews computed using the algorithm described above

from pyspark.sql.functions import avg
from pyspark.sql.types import FloatType
import re

# Lists of positive and negative words
positive_words = {'good', 'great', 'excellent', 'amazing', 'fantastic', 'wonderful', 'pleasant', 'lovely', 'nice', 'enjoyed'}
negative_words = {'bad', 'terrible', 'awful', 'horrible', 'disappointing', 'poor', 'hate', 'unpleasant', 'dirty', 'noisy'}

# TODO: Implement the UDF
def sentiment_score(comment):
    if comment is None:
        return None
    # tokenize into alphabetic words, case-insensitive
    tokens = re.findall(r"[a-z]+", comment.lower())
    pos = sum(1 for t in tokens if t in positive_words)
    neg = sum(1 for t in tokens if t in negative_words)
    return float(pos - neg)

sentiment_score_udf = udf(sentiment_score, FloatType())

# Apply UDF per review
reviews_with_sentiment = reviews.where(col("comments").isNotNull()).withColumn(
    "sentiment_score", sentiment_score_udf(col("comments"))
)

# TODO: Create the final DataFrame
# (join with listings to get 'name', compute average per listing)
avg_sentiment_per_listing = (
    reviews_with_sentiment.join(listings.select(col("id").alias("listing_id"), "name"), on="listing_id", how="inner")
    .groupBy("name")
    .agg(avg("sentiment_score").alias("average_sentiment"))
)

# Example: show top 10 most positive listings based on average score
avg_sentiment_per_listing.orderBy(col("average_sentiment").desc()).show(10, truncate=False)

[Stage 9:>                                                          (0 + 1) / 1]

+--------------------------------------------------+-----------------+
|name                                              |average_sentiment|
+--------------------------------------------------+-----------------+
|Beautiful house perfect for touring central London|9.0              |
|Beautifully Contemporary Three Bedroom House      |9.0              |
|Trent View a holiday home in London               |8.0              |
|Gorgeous family home in Fulham for 6 by the river |8.0              |
|Modern family home, large open kitchen & garden   |8.0              |
|Central and Cozy 2 BR Flat next to Pimlico station|7.0              |
|Gorgeous 4BR family home & garden                 |7.0              |
|Comfortable Family Home                           |7.0              |
|Peaceful cosy Ealing Apartment, private Garden    |7.0              |
|large loft room with king size bed                |7.0              |
+--------------------------------------------------+-----------------+
only s

                                                                                

In [9]:
# 3. Rewrite the following code from the previous exercise using SparkSQL:
#
# ```
# from pyspark.sql.functions import length, avg, count
# 
# reviews_with_comment_length = reviews.withColumn('comment_length', length('comments'))
# reviews_with_comment_length \
#   .join(listings, reviews_with_comment_length.listing_id == listings.id, 'inner') \
#   .groupBy('listing_id').agg(
#       avg(reviews_with_comment_length.comment_length).alias('average_comment_length'),
#       count(reviews_with_comment_length.id).alias('reviews_count')
#   ) \
#   .filter('reviews_count >= 5') \
#   .orderBy('average_comment_length', ascending=False) \
#   .show()
# ```
# This was a solution for the the task:
#
# "Get top five listings with the highest average review comment length. Only return listings with at least 5 reviews"

reviews.createOrReplaceTempView("reviews")
listings.createOrReplaceTempView("listings")

sql_query = """
SELECT
  l.id   AS listing_id,
  l.name AS name,
  AVG(LENGTH(r.comments)) AS average_comment_length,
  COUNT(r.id)             AS reviews_count
FROM reviews r
JOIN listings l
  ON r.listing_id = l.id
WHERE r.comments IS NOT NULL
GROUP BY l.id, l.name
HAVING COUNT(r.id) >= 5
ORDER BY average_comment_length DESC
LIMIT 5
"""

spark.sql(sql_query).show(truncate=False)

[Stage 13:>                                                         (0 + 1) / 1]

+------------------+-------------------------------------------------+----------------------+-------------+
|listing_id        |name                                             |average_comment_length|reviews_count|
+------------------+-------------------------------------------------+----------------------+-------------+
|618608352812465378|Beautiful Georgian top floor flat- 2 Bed.        |1300.1666666666667    |6            |
|28508447          |The warm and cosy place.                         |1089.3333333333333    |6            |
|627425975703032358|Superb loft beautiful quiet safe area near Center|951.7777777777778     |9            |
|2197681           |Luxurious apartment in great area                |939.2                 |5            |
|13891813          |Beautiful 2 Bedroom Apartment in South Kensington|905.0                 |5            |
+------------------+-------------------------------------------------+----------------------+-------------+



                                                                                

In [None]:
# 4. [Optional][Challenge]
# Calculate an average time passed from the first review for each host in the listings dataset. 
# To implmenet a custom aggregation function you would need to use "pandas_udf" function to write a custom aggregation function.
#
# Documentation about "pandas_udf": https://spark.apache.org/docs/3.4.2/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html 
#
# To use "pandas_udf" you would need to install two additional dependencies in the virtual environment you use for PySpark:
# Run these commands:
# ```
# pip install pandas
# pip install pyarrow
# ```

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import PandasUDFType
import pandas as pd

@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def average_days_since_first_review_udf(first_review_series) -> float:
    # TODO: Implement the UDF
    pass

listings \
  .filter(
    listings.first_review.isNotNull()
  ) \
  .groupBy('host_id') \
  .agg(
    average_days_since_first_review_udf(listings.first_review).alias('average_days_since_first_review_days')
  ) \
  .show()