In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark aggregation functions") \
    .getOrCreate()

25/08/19 13:15:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
listings = spark.read.csv("./data/listings.csv.gz", 
    header=True,
    inferSchema=True,
    sep=",", 
    quote='"',
    escape='"', 
    multiLine=True,
    mode="PERMISSIVE" 
)
listings.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: date (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: int

                                                                                

In [4]:
reviews = spark.read.csv("./data/reviews.csv.gz", 
    header=True,
    inferSchema=True,
    sep=",",
    quote='"',
    escape='"',
    multiLine=True,
    mode="PERMISSIVE"
)
reviews.printSchema()

[Stage 3:>                                                          (0 + 1) / 1]

root
 |-- listing_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



                                                                                

In [5]:
# 1. For each listing compute string category depending on its price, and add it as a new column.
# A category is defined in the following way:
#
# * price < 50 -> "Budget"
# * 50 <= price < 150 -> "Mid-range"
# * price >= 150 -> "Luxury"
# 
# Only include listings where the price is not null.
# Count the number of listings in each category

from pyspark.sql.functions import regexp_replace

listings = listings.withColumn('price_numeric', regexp_replace('price', '[$,]', '').cast('float'))

# TODO: Implement a UDF
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
def classifyBudget(num: float):
    if num < 50:
        return "Budget"
    elif num >= 50 and num <= 150:
        return "Mid-range"
    else:
        return "Luxury"
            
classifyBudget_udf = F.udf(
            classifyBudget,
            StringType()
            )
# TODO: Apply the UDF to create a new DataFrame
new_df = listings.withColumn(
    "budget_type",
    classifyBudget_udf('price_numeric')
)
# new_df.select("budget_type", "price_numeric").limit(10).show()
new_df\
    .filter(new_df.price.isNotNull())\
    .groupBy("budget_type")\
    .agg(
        F.count("budget_type").alias("num_of_budget_type_category")
    )\
    .show(10)

[Stage 4:>                                                          (0 + 1) / 1]

+-----------+---------------------------+
|budget_type|num_of_budget_type_category|
+-----------+---------------------------+
|  Mid-range|                      28614|
|     Budget|                       6612|
|     Luxury|                      27458|
+-----------+---------------------------+



                                                                                

In [6]:
# 2. In this task you will need to compute a santiment score per review, and then an average sentiment score per listing.
# A santiment score indicates how "positive" or "negative" a review is. The higher the score the more positive it is, and vice-versa.
#
# To compute a sentiment score per review compute the number of positive words in a review and subtract the number of negative
# words in the same review (the list of words is already provided)
#
# To complete this task, compute a DataFrame that contains the following fields:
# * name - the name of a listing
# * average_sentiment - average sentiment of reviews computed using the algorithm described above
from pyspark.sql.types import FloatType

# Lists of positive and negative words
positive_words = {'good', 'great', 'excellent', 'amazing', 'fantastic', 'wonderful', 'pleasant', 'lovely', 'nice', 'enjoyed'}
negative_words = {'bad', 'terrible', 'awful', 'horrible', 'disappointing', 'poor', 'hate', 'unpleasant', 'dirty', 'noisy'}

def remove_punc(s: str) -> str:
    if s is None:
        return ""
    res = []
    for x in s:
        if x.isalpha() or x.isspace():
            res.append(x)
    return "".join(res)
    
# TODO: Implement the UDF
def sentiment_score(comment: str) -> float:
    if comment is None:
        return 0.0
    pos_score = 0
    neg_score = 0
    cleaned_comment = remove_punc(comment.lower())
    for word in cleaned_comment.split():
        if word in positive_words:
            pos_score += 1
        elif word in negative_words:
            neg_score += 1
    return float(pos_score - neg_score)

import pyspark.sql.functions as F
sentiment_score_udf = F.udf(sentiment_score, FloatType())

reviews_with_sentiment = reviews \
  .withColumn(
    'sentiment_score',
    sentiment_score_udf(reviews.comments)
  )
# reviews_with_sentiment.select('comments', 'sentiment_score').show(5, truncate=True)

# TODO: Create a final DataFrame
final_df = listings.join(reviews_with_sentiment, listings.id == reviews_with_sentiment.listing_id, how="inner")
final_df\
    .groupBy("listing_id", "name")\
    .agg(
        F.avg("sentiment_score").alias('sentiment_avg')
    )\
    .orderBy('sentiment_avg', ascending=False)\
    .show(10)

[Stage 8:>                                                          (0 + 1) / 1]

+-------------------+--------------------+-------------+
|         listing_id|                name|sentiment_avg|
+-------------------+--------------------+-------------+
|           23872786|Beautiful house p...|          9.0|
|           40386257|Beautifully Conte...|          9.0|
|           37015443|Modern family hom...|          8.0|
| 662470859745906578|Trent View a holi...|          8.0|
|           34277415|Gorgeous family h...|          8.0|
|1169478573487073390|Spacious Victoria...|          7.0|
|1380936421100195787|Peaceful cosy Eal...|          7.0|
|           13535637|large loft room w...|          7.0|
|           38199388|Comfortable Famil...|          7.0|
|            8630729|Central and Cozy ...|          7.0|
+-------------------+--------------------+-------------+
only showing top 10 rows


                                                                                

In [7]:
# 3. Rewrite the following code from the previous exercise using SparkSQL:
#
# ```
# from pyspark.sql.functions import length, avg, count
# 
# reviews_with_comment_length = reviews.withColumn('comment_length', length('comments'))
# reviews_with_comment_length \
#   .join(listings, reviews_with_comment_length.listing_id == listings.id, 'inner') \
#   .groupBy('listing_id').agg(
#       avg(reviews_with_comment_length.comment_length).alias('average_comment_length'),
#       count(reviews_with_comment_length.id).alias('reviews_count')
#   ) \
#   .filter('reviews_count >= 5') \
#   .orderBy('average_comment_length', ascending=False) \
#   .show()
# ```
# This was a solution for the the task:
#
# "Get top five listings with the highest average review comment length. Only return listings with at least 5 reviews"

reviews.createOrReplaceTempView("reviews")
listings.createOrReplaceTempView("listings")

# Write the SQL query
sql_query = """
SELECT
    listings.id,
    listings.name,
    AVG(LENGTH(reviews.comments)) AS average_comment_length,
    COUNT(reviews.comments) AS num_of_reviews
FROM
    listings
INNER JOIN
    reviews
ON
    listings.id = reviews.listing_id
GROUP BY
    listings.id, listings.name
HAVING
    COUNT(reviews.comments) >= 5
ORDER BY
    average_comment_length DESC  
LIMIT 5
"""

spark \
  .sql(sql_query) \
  .show()

25/08/19 13:15:54 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 12:>                                                         (0 + 1) / 1]

+------------------+--------------------+----------------------+--------------+
|                id|                name|average_comment_length|num_of_reviews|
+------------------+--------------------+----------------------+--------------+
|618608352812465378|Beautiful Georgia...|    1300.1666666666667|             6|
|          28508447|The warm and cosy...|    1089.3333333333333|             6|
|627425975703032358|Superb loft beaut...|     951.7777777777778|             9|
|           2197681|Luxurious apartme...|                 939.2|             5|
|          13891813|Beautiful 2 Bedro...|                 905.0|             5|
+------------------+--------------------+----------------------+--------------+



                                                                                

In [10]:
# 4. [Optional][Challenge]
# Calculate an average time passed from the first review for each host in the listings dataset. 
# To implmenet a custom aggregation function you would need to use "pandas_udf" function to write a custom aggregation function.
#
# Documentation about "pandas_udf": https://spark.apache.org/docs/3.4.2/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html 
#
# To use "pandas_udf" you would need to install two additional dependencies in the virtual environment you use for PySpark:
# Run these commands:
# ```
# pip install pandas
# pip install pyarrow
# ```

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import PandasUDFType
import pandas as pd

@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def average_days_since_first_review_udf(first_review_series) -> float:
    # TODO: Implement the UDF
    today = pd.to_datetime('today')
    listings_ages = (today - pd.to_datetime(first_review_series)).dt.days
    if listings_ages.empty:
        return None
    return listings_ages.mean()

listings \
  .filter(
    listings.first_review.isNotNull()
  ) \
  .groupBy('host_id') \
  .agg(
    average_days_since_first_review_udf(listings.first_review).alias('average_days_since_first_review_days')
  ) \
  .show()

[Stage 21:>                                                         (0 + 1) / 1]

+-------+------------------------------------+
|host_id|average_days_since_first_review_days|
+-------+------------------------------------+
|   6774|                  2025.1666666666667|
|   9089|                               508.0|
|   9323|                              3018.0|
|  10657|                              2185.0|
|  11431|                              3614.0|
|  14596|                              2701.0|
|  19195|                              3665.0|
|  25235|                              3301.0|
|  26258|                                73.0|
|  30577|                               992.0|
|  30780|                              3302.0|
|  32851|                              3535.5|
|  34007|                               370.0|
|  36808|                               772.0|
|  38691|                               999.0|
|  40515|                              2261.0|
|  40944|                   553.0833333333334|
|  41759|                              5391.0|
|  43039|    

Exception ignored in: <_io.BufferedWriter name=5>                               
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/apache-spark/4.0.0/libexec/python/lib/pyspark.zip/pyspark/daemon.py", line 200, in manager
BrokenPipeError: [Errno 32] Broken pipe
