In [1]:
import os
import sys

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Advanced Spark").getOrCreate()

In [3]:
listings_df = spark.read.csv(
    path="./data/airbnb-london-listings.csv.gz",
    header=True,
    inferSchema=True,
    sep=",",
    quote='"',
    escape='"',
    multiLine=True,
    mode="PERMISSIVE",
)
listings_df.printSchema()

reviews_df = spark.read.csv(
    path="./data/airbnb-london-reviews.csv.gz",
    header=True,
    inferSchema=True,
    sep=",",
    quote='"',
    escape='"',
    multiLine=True,
    mode="PERMISSIVE",
)
reviews_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: date (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: int

In [4]:
# 1. For each listing compute string category depending on its price, and add it as a new column.
# A category is defined in the following way:
#
# * price < 50 -> "Budget"
# * 50 <= price < 150 -> "Mid-range"
# * price >= 150 -> "Luxury"
#
# Only include listings where the price is not null.
# Count the number of listings in each category

from pyspark.sql.functions import regexp_replace, udf
from pyspark.sql.types import StringType

listings_df = listings_df.withColumn(
    "price_numeric", regexp_replace("price", "[$,]", "").cast("float")
)

In [5]:
def price_category(price):
    if price < 50:
        return "Budget"
    elif 50 <= price < 150:
        return "Mid-range"
    elif price >= 150:
        return "Luxury"
    elif price is None:
        return "Unknown"
    else:
        return "Unknown"


price_category_udf = udf(price_category, StringType())

In [6]:
listings_with_category = (
    listings_df.filter(listings_df.price_numeric.isNotNull())
    .withColumn("price_category", price_category_udf(listings_df.price_numeric))
    .groupby("price_category")
    .count()
    .show()
)

+--------------+-----+
|price_category|count|
+--------------+-----+
|     Mid-range|28333|
|        Budget| 6114|
|        Luxury|27516|
+--------------+-----+



In [7]:
# 2. In this task you will need to compute a sentiment score per review, and then an average sentiment score per listing.
# A sentiment score indicates how "positive" or "negative" a review is. The higher the score the more positive it is, and vice-versa.
#
# To compute a sentiment score per review compute the number of positive words in a review and subtract the number of negative
# words in the same review (the list of words is already provided)
#
# To complete this task, compute a DataFrame that contains the following fields:
# * name - the name of a listing
# * average_sentiment - average sentiment of reviews computed using the algorithm described above

from pyspark.sql.functions import avg
from pyspark.sql.types import FloatType

# Lists of positive and negative words
positive_words = {
    "good",
    "great",
    "excellent",
    "amazing",
    "fantastic",
    "wonderful",
    "pleasant",
    "lovely",
    "nice",
    "enjoyed",
}
negative_words = {
    "bad",
    "terrible",
    "awful",
    "horrible",
    "disappointing",
    "poor",
    "hate",
    "unpleasant",
    "dirty",
    "noisy",
}


def sentiment_score(comments):
    if comments is None:
        return 0.0

    comments_lower = comments.lower()
    score = 0

    for pos_word, neg_word in zip(positive_words, negative_words):
        if pos_word in comments_lower:
            score += 1

        if neg_word in comments_lower:
            score -= 1

    return float(score)


sentiment_score_udf = udf(sentiment_score, FloatType())

reviews_with_sentiment = reviews_df.withColumn(
    "sentiment_score",
    sentiment_score_udf(reviews_df.comments),
)

listings_df.join(
    other=reviews_with_sentiment,
    on=listings_df.id == reviews_df.listing_id,
    how="inner",
).groupBy(
    listings_df.id,
    listings_df.name,
).agg(
    avg(reviews_with_sentiment.sentiment_score).alias("avg_sentiment_score"),
).orderBy(
    "avg_sentiment_score",
    ascending=False,
).show(
    truncate=False,
)

+-------------------+--------------------------------------------------+-------------------+
|id                 |name                                              |avg_sentiment_score|
+-------------------+--------------------------------------------------+-------------------+
|8630729            |Central and Cozy 2 BR Flat next to Pimlico station|6.0                |
|1475288755040843693|Quiet room 10 mins walk to portobello rd          |6.0                |
|945592509209998667 |Cozy One Bedroom Full Flat                        |5.0                |
|24763465           |Luxury Holiday Let | Prime SW19 Village Location  |5.0                |
|3804150            |London NW3. Feel at home single room              |5.0                |
|1366557021812675836|Fulham Townhouse                                  |5.0                |
|39575614           |Light-filled 1 bed Victorian flat in Canonbury    |5.0                |
|1328328005396938099|Private Room in cosy house                       

In [8]:
# 3. Rewrite the following code from the previous exercise using SparkSQL:
#
# ```
# from pyspark.sql.functions import length, avg, count
#
# reviews_with_comment_length = reviews.withColumn('comment_length', length('comments'))
# reviews_with_comment_length \
#   .join(listings, reviews_with_comment_length.listing_id == listings.id, 'inner') \
#   .groupBy('listing_id').agg(
#       avg(reviews_with_comment_length.comment_length).alias('average_comment_length'),
#       count(reviews_with_comment_length.id).alias('reviews_count')
#   ) \
#   .filter('reviews_count >= 5') \
#   .orderBy('average_comment_length', ascending=False) \
#   .show()
# ```
# This was a solution for the the task:
#
# "Get top five listings with the highest average review comment length. Only return listings with at least 5 reviews"

listings_df.createOrReplaceTempView("listings")
reviews_df.createOrReplaceTempView("reviews")

query = """
    SELECT
        r.listing_id,
        AVG(LENGTH(r.comments)) AS avg_comments_length,
        COUNT(r.id) AS reviews_count
    FROM 
        listings AS l
    INNER JOIN 
        reviews AS r
        ON l.id = r.listing_id
    GROUP BY
        1
    HAVING
        COUNT(r.id) >= 5
    ORDER BY
        2 DESC
"""

spark.sql(sqlQuery=query).show(truncate=False)

+------------------+-------------------+-------------+
|listing_id        |avg_comments_length|reviews_count|
+------------------+-------------------+-------------+
|618608352812465378|1300.1666666666667 |6            |
|28508447          |1089.3333333333333 |6            |
|22661311          |1035.857142857143  |7            |
|53145228          |1006.6666666666666 |6            |
|627425975703032358|951.7777777777778  |9            |
|2197681           |939.2              |5            |
|13891813          |905.0              |5            |
|979753            |893.9230769230769  |13           |
|630150178279666225|890.7272727272727  |11           |
|8856894           |890.1666666666666  |6            |
|33310686          |885.8333333333334  |6            |
|22524075          |885.0              |5            |
|29469389          |885.0              |6            |
|5555679           |878.7169811320755  |106          |
|6594477           |863.6              |5            |
|33385444 

In [9]:
# 4. [Optional][Challenge]
# Calculate an average time passed from the first review for each listing in the listings dataset.
# To implmenet a custom aggregation function you would need to use "pandas_udf" function to write a custom aggregation function.
#
# Documentation about "pandas_udf": https://spark.apache.org/docs/3.4.2/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html
#
# To use "pandas_udf" you would need to install two additional dependencies in the virtual environment you use for PySpark:
# Run these commands:
# ```
# pip install pandas
# pip install pyarrow
# ```

import pandas as pd
from pyspark.sql.functions import PandasUDFType, pandas_udf
from pyspark.sql.types import DateType, DoubleType


@pandas_udf(returnType=DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def avg_days_from_first_review_udf(first_review_series: DateType):
    today_date = pd.to_datetime("today")
    avg_listing_ages = (today_date - pd.to_datetime(first_review_series)).mean().days

    if pd.isna(avg_listing_ages):
        return None

    return float(avg_listing_ages)


listings_df.filter(
    listings_df.first_review.isNotNull(),
).groupBy(
    "host_id",
).agg(
    avg_days_from_first_review_udf(
        listings_df.first_review,
    ).alias("avg_days_from_first_review"),
).orderBy(
    "avg_days_from_first_review",
    ascending=False,
).show()



+-------+--------------------------+
|host_id|avg_days_from_first_review|
+-------+--------------------------+
|  60302|                    5870.0|
| 199549|                    5604.0|
| 211291|                    5602.0|
| 157884|                    5599.0|
| 212734|                    5592.0|
| 155938|                    5562.0|
|  41759|                    5541.0|
|  54730|                    5450.0|
| 379315|                    5420.0|
|  67564|                    5415.0|
| 466353|                    5387.0|
| 428600|                    5379.0|
| 502496|                    5359.0|
| 536025|                    5357.0|
| 264345|                    5345.0|
| 432648|                    5332.0|
| 181028|                    5329.0|
| 182993|                    5318.0|
| 134938|                    5311.0|
| 554519|                    5310.0|
+-------+--------------------------+
only showing top 20 rows


In [10]:
spark.stop()