# PySpark Data Cleaning exercise

**Import Libraries for Python Spark** 

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, DecimalType, LongType
from pyspark.sql.functions import *

**Initialize a Spark session**

In [5]:
spark = SparkSession.\
    builder.\
    getOrCreate()

**Read raw data from .csv files**

In [6]:
raw_listings_df = spark.read.csv("listings.csv",header=True, inferSchema=True,multiLine=True, escape="\"", quote="\"")

In [7]:
raw_reviews_df = spark.read.csv("reviews.csv",header=True, inferSchema=True,multiLine=True, escape="\"", quote="\"")

**Exploratory analysis
    - Understand column names and types
    - Check out a sample of the data**

In [8]:
raw_listings_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: date (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: int

In [9]:
raw_reviews_df.printSchema()

root
 |-- listing_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



In [11]:
raw_reviews_df.show(3)

+----------+-----+----------+-----------+-------------+--------------------+
|listing_id|   id|      date|reviewer_id|reviewer_name|            comments|
+----------+-----+----------+-----------+-------------+--------------------+
|       958| 5977|2009-07-23|      15695|     Edmund C|Our experience wa...|
|       958| 6660|2009-08-03|      26145|        Simon|Returning to San ...|
|       958|11519|2009-09-27|      25839|        Denis|We were very plea...|
+----------+-----+----------+-----------+-------------+--------------------+
only showing top 3 rows



**Schema-on-write approach
Using schema enforcement or schema-on-write approach: define the schema to attempt write the data using that schema.
It allows to catch Data Quality issues early: data types mismatches, values that are out of bounds.**

In [12]:
listings_schema = StructType([
 StructField("id", LongType(), nullable=False),
 StructField("host_id", IntegerType(), nullable=False),
 StructField("neighbourhood", StringType(), nullable=True),
 StructField("room_type", StringType(), nullable=False),
 StructField("price", DecimalType(10, 2), nullable=True),
 StructField("minimum_nights", IntegerType(), nullable=True),
 StructField("number_of_reviews", IntegerType(), nullable=True),
 StructField("last_review", DateType(), nullable=True)
])

**Select only the fields required for future analysis**

In [13]:
selected_df = raw_listings_df.select("id",\
                                    "host_id",\
                                    "neighbourhood_cleansed",\
                                    "room_type",\
                                    "price",\
                                    "minimum_nights",\
                                    "number_of_reviews",\
                                    "last_review")

In [14]:
selected_df.show(3,truncate=False)

+-------------------+---------+----------------------+---------------+-------+--------------+-----------------+-----------+
|id                 |host_id  |neighbourhood_cleansed|room_type      |price  |minimum_nights|number_of_reviews|last_review|
+-------------------+---------+----------------------+---------------+-------+--------------+-----------------+-----------+
|1158063164891809038|294451422|Inner Sunset          |Private room   |$50.00 |30            |0                |NULL       |
|53241705           |1825803  |Western Addition      |Entire home/apt|$150.00|30            |2                |2022-07-04 |
|115242             |582450   |North Beach           |Entire home/apt|$140.00|30            |21               |2023-08-26 |
+-------------------+---------+----------------------+---------------+-------+--------------+-----------------+-----------+
only showing top 3 rows



**Cast field type and rename columns, if required**

In [15]:
casted_df = (
    selected_df
    .withColumn("price", regexp_replace("price", "[$,]", "").cast(DecimalType(10, 2)))
    .withColumnRenamed("neighbourhood_cleansed", "neighbourhood")
)

In [16]:
casted_df.show(3)

+-------------------+---------+----------------+---------------+------+--------------+-----------------+-----------+
|                 id|  host_id|   neighbourhood|      room_type| price|minimum_nights|number_of_reviews|last_review|
+-------------------+---------+----------------+---------------+------+--------------+-----------------+-----------+
|1158063164891809038|294451422|    Inner Sunset|   Private room| 50.00|            30|                0|       NULL|
|           53241705|  1825803|Western Addition|Entire home/apt|150.00|            30|                2| 2022-07-04|
|             115242|   582450|     North Beach|Entire home/apt|140.00|            30|               21| 2023-08-26|
+-------------------+---------+----------------+---------------+------+--------------+-----------------+-----------+
only showing top 3 rows



**Write data to your dataframe applying predefined schema and analyse errors**

In [17]:
listings_df = spark.createDataFrame(casted_df.rdd, schema=listings_schema)

In [18]:
listings_df.printSchema()

root
 |-- id: long (nullable = false)
 |-- host_id: integer (nullable = false)
 |-- neighbourhood: string (nullable = true)
 |-- room_type: string (nullable = false)
 |-- price: decimal(10,2) (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: date (nullable = true)



In [19]:
listings_df.show(3)

+-------------------+---------+----------------+---------------+------+--------------+-----------------+-----------+
|                 id|  host_id|   neighbourhood|      room_type| price|minimum_nights|number_of_reviews|last_review|
+-------------------+---------+----------------+---------------+------+--------------+-----------------+-----------+
|1158063164891809038|294451422|    Inner Sunset|   Private room| 50.00|            30|                0|       NULL|
|           53241705|  1825803|Western Addition|Entire home/apt|150.00|            30|                2| 2022-07-04|
|             115242|   582450|     North Beach|Entire home/apt|140.00|            30|               21| 2023-08-26|
+-------------------+---------+----------------+---------------+------+--------------+-----------------+-----------+
only showing top 3 rows



**Replacing all occurrences of "" (empty strings) with null in all columns**

In [20]:
listings_df = listings_df.replace("", None)

**Remove leading/trailing whitespace from all string columns**

In [21]:
listings_df = listings_df.select([
    trim(col(c)).alias(c) if dtype == StringType() else col(c)
    for c, dtype in listings_df.dtypes
])

**Capitalize Strings using UDF**

In [22]:
def to_title_case(s):
    if s is None:
        return None
    return ' '.join(word.capitalize() for word in s.strip().split())

title_case_udf = udf(to_title_case, StringType())

listings_df = listings_df.select([
    title_case_udf(col(c)).alias(c) if dtype == "string" else col(c)
    for c, dtype in listings_df.dtypes
])

**Check id field for duplicates**

In [23]:
listings_df.groupBy("id") \
  .count() \
  .filter("count > 1") \
  .show()

+---+-----+
| id|count|
+---+-----+
+---+-----+



**Check for nulls in the number_of_reviews column and replace them with 0**

In [24]:
listings_df = listings_df.fillna({"number_of_reviews": 0})

**Check/explore data**

In [25]:
listings_df.select("neighbourhood").distinct().show()

+-------------------+
|      neighbourhood|
+-------------------+
|       Inner Sunset|
|     Haight Ashbury|
| Financial District|
|        North Beach|
|   Western Addition|
|     Bernal Heights|
|          Lakeshore|
|       Outer Sunset|
|  Visitacion Valley|
|     Inner Richmond|
|           Nob Hill|
|     Outer Richmond|
|     Crocker Amazon|
|Castro/upper Market|
|            Bayview|
|          Chinatown|
|            Mission|
|          Excelsior|
|         Twin Peaks|
|           Seacliff|
+-------------------+
only showing top 20 rows



In [26]:
listings_df.select("room_type").distinct().show()

+---------------+
|      room_type|
+---------------+
|Entire Home/apt|
|    Shared Room|
|     Hotel Room|
|   Private Room|
+---------------+



**Check that room_type column contains only the expected values**

In [27]:
expected_room_types = [
    "Shared room",
    "Hotel room",
    "Entire home/apt",
    "Private room"
]
unexpected_values = listings_df.filter(~col("room_type").isin(expected_room_types))
unexpected_values.select("room_type").distinct().show()

+---------------+
|      room_type|
+---------------+
|Entire Home/apt|
|    Shared Room|
|     Hotel Room|
|   Private Room|
+---------------+



**Filter the recent data (past year)**

In [28]:
recent_reviews_df = raw_reviews_df.filter(col("date") >= date_sub(current_date(), 365))

**Joining two datasets applying broadcast join for optimization.
Using a broadcast join while joining recent_reviews_df with listings_df, where listings_df is much smaller,
allows us to avoid shuffling.**

In [29]:
from pyspark.sql.functions import broadcast

listings_with_reviews = recent_reviews_df.join(
    broadcast(listings_df),  
    recent_reviews_df.listing_id == listings_df.id,
    how="inner"
)