# Transforming Data Using PySpark for AWS Glue

## First Import SparkSession

In [1]:
from pyspark.sql import SparkSession

## Then Create a Spark Session

In [2]:
spark = SparkSession.builder.appName("Airbnb_Warehousing").getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/07/29 12:53:12 WARN Utils: Your hostname, nishal resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/07/29 12:53:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/29 12:53:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Read the Listings CSV File

In [3]:
listing_df = spark.read\
    .format("csv")\
    .option("multiline", "true")\
    .option("quote", "\"")\
    .option("header", "true")\
    .option("escape", "\\")\
    .option("escape", "\"")\
    .option("sep", ",")\
    .option("inferSchema", "true")\
    .load("../data/listings.csv")

In [4]:
listing_df.printSchema()

listingsDf = listing_df  # Not needed for this specific notebook. Will be used later to test glue script

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: date (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: int

### Looks Like the DataFrame was Correctly Read

In [13]:
listing_df.show(2)

+------+--------------------+--------------+------------+-----------+--------------------+-----------+---------------------+--------------------+-------+--------------------+-------------+----------+-------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+--------------------+----------------------+----------------------------+--------+---------+--------------------+---------------+------------+---------+--------------+--------+----+---------+-------+--------------+--------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------+----------------+---------------+---------------+---------------+----------------+---------------------+-----------------+-----------

# Separating Tables

## Dropping Redundant/Empty Columns

In [14]:
columns_to_remove = [
                        "calendar_last_scraped",
                        "description",
                        "calendar_updated", 
                        "bedrooms", 
                        "bathrooms", 
                        "neighbourhood_group_cleansed", 
                        "amenities"
                    ]
listing_df = listing_df.drop(*columns_to_remove)

## Correcting Misspell

In [15]:
listing_df = listing_df.withColumnRenamed("neighborhood_overview", "neighbourhood_overview")

In [16]:
# Looks like the rows have been dropped
listing_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: date (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- neighbourhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: integer (nullable = true)
 |-- host_total_lis

## Transforming Listing Table

In [17]:
listing_df.createOrReplaceTempView("listing_df_view")

In [18]:
from pyspark.sql.functions import udf, col, split
from pyspark.sql.types import StringType, IntegerType

# UDF to transform host_verifications to a 3 character string
# index 0 = email, index 1 = phone, index 2 = work_email. 1 = verified, 0 = not verified
def hqad_df_host_verifications_transform(verifs_list):
   res = ""

   verifs = ["email", "phone", "work_email"]

   for verif in verifs:
      if verif in verifs_list:
         res += "1"
      else:
         res += "0"

   return res


# UDF To convert t/f to 1/0 respectively
def t_f_to_1_0(t_f):
    if t_f == "t":
        return 1
    else:
        return 0


hqad_array_transform = udf(hqad_df_host_verifications_transform, StringType())
truefalse_to_10 = udf(t_f_to_1_0, IntegerType())

# Query to get row number, replace percent to decimal, change host_response_time to shorter key, and change t/f to 1/0
listing_df_query = """
                    SELECT id,
                        host_id,
                        host_url,
                        host_name,
                        host_since,
                        host_location,
                        host_about,
                        host_thumbnail_url,
                        host_picture_url,
                        host_neighbourhood,
                        CASE host_response_time
                            WHEN 'within an hour' THEN 'H'
                            WHEN 'within a few hours' THEN 'FH'
                            WHEN 'within a dat' THEN 'D'
                            WHEN 'a few days or more' THEN 'D+'
                            ELSE NULL
                        END AS host_response_time, 
                        CAST(REPLACE(host_response_rate, '%', '') / 100 AS DECIMAL(3,2)) AS host_response_rate,
                        CAST(REPLACE(host_acceptance_rate, '%', '') / 100 AS DECIMAL(3,2)) AS host_acceptance_rate,
                        host_is_superhost,
                        host_listings_count, 
                        host_total_listings_count, 
                        host_verifications, 
                        host_has_profile_pic,
                        host_identity_verified,
                        calculated_host_listings_count,
                        calculated_host_listings_count_entire_homes,
                        calculated_host_listings_count_private_rooms,
                        calculated_host_listings_count_shared_rooms,
                        latitude,
                        longitude,
                        property_type,
                        room_type,
                        accommodates,
                        CASE
                            WHEN INSTR(bathrooms_text, 'Private half-bath') > 0 THEN '0.5 private bath'
                            WHEN INSTR(bathrooms_text, 'Half-bath') > 0 THEN '0.5 bath'
                            ELSE bathrooms_text
                        END as bathrooms,
                        beds,
                        cast(replace(price, '$', '') as decimal(10,2)) as price,
                        number_of_reviews,
                        number_of_reviews_ltm,
                        number_of_reviews_l30d,
                        first_review,
                        last_review,
                        review_scores_rating,
                        review_scores_accuracy,
                        review_scores_cleanliness,
                        review_scores_checkin,
                        review_scores_communication,
                        review_scores_location,
                        review_scores_value,
                        reviews_per_month,
                        scrape_id,
                        last_scraped,
                        source,
                        neighbourhood,
                        neighbourhood_overview,
                        neighbourhood_cleansed,
                        maximum_nights,
                        minimum_nights,
                        minimum_minimum_nights,
                        maximum_minimum_nights,
                        minimum_maximum_nights,
                        maximum_maximum_nights,
                        minimum_nights_avg_ntm,
                        maximum_nights_avg_ntm,
                        has_availability,
                        availability_30,
                        availability_60,
                        availability_90,
                        availability_365,
                        listing_url,
                        name,
                        picture_url,
                        license,
                        instant_bookable
                    FROM listing_df_view
"""

listing_df = spark.sql(listing_df_query)
# Splitting the bathroom column into two
property_baths_split = split(col("bathrooms"), ' ', limit=2)
# Adding the two new columns
listing_df = listing_df.withColumn("bathroom_desc", property_baths_split.getItem(1))
listing_df = listing_df.withColumn("bathrooms", property_baths_split.getItem(0).cast("decimal(5,1)"))
listing_df = listing_df.withColumn("host_verifications", hqad_array_transform(col("host_verifications")))
listing_df = listing_df.withColumns(
   {
      "host_is_superhost": truefalse_to_10(col("host_is_superhost")), 
      "host_has_profile_pic": truefalse_to_10(col("host_has_profile_pic")), 
      "host_identity_verified": truefalse_to_10(col("host_identity_verified")), 
      "has_availability": truefalse_to_10(col("has_availability")), 
      "instant_bookable": truefalse_to_10(col("instant_bookable"))
   }
)

listing_df.show(10)

[Stage 30:>                                                         (0 + 1) / 1]

+-------+-------+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+--------------------+-----------------+-------------------+-------------------------+------------------+--------------------+----------------------+------------------------------+-------------------------------------------+--------------------------------------------+-------------------------------------------+--------+---------+--------------------+---------------+------------+---------+----+------+-----------------+---------------------+----------------------+------------+-----------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----------------+--------------+------------+-----------+--------------------+----------------------+----------------------+-------------

                                                                                

### Whew! That was One Long Transformation!

In [19]:
# Recreating Listings View
listing_df.createOrReplaceTempView("listing_df_view")

## Creating Host Tables

### Host Table

In [20]:
host_df = listing_df.select(
    "host_id",
    "host_url",
    "host_name",
    "host_since",
    "host_location",
    "host_about",
    "host_thumbnail_url",
    "host_picture_url",
    "host_neighbourhood",
    "host_response_time",
    "host_response_rate",
    "host_acceptance_rate",
    "host_is_superhost",
    "host_listings_count",
    "host_total_listings_count",
    "host_verifications",
    "host_has_profile_pic",
    "host_identity_verified",
    "calculated_host_listings_count",
    "calculated_host_listings_count_entire_homes",
    "calculated_host_listings_count_private_rooms",
    "calculated_host_listings_count_shared_rooms",
)

host_df = host_df.dropDuplicates()

print("count: " + str(host_df.count()))

host_df.printSchema()

host_df.show(2)

                                                                                

count: 977
root
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: decimal(3,2) (nullable = true)
 |-- host_acceptance_rate: decimal(3,2) (nullable = true)
 |-- host_is_superhost: integer (nullable = true)
 |-- host_listings_count: integer (nullable = true)
 |-- host_total_listings_count: integer (nullable = true)
 |-- host_verifications: string (nullable = true)
 |-- host_has_profile_pic: integer (nullable = true)
 |-- host_identity_verified: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- calculated_host_listings_count_entire_

[Stage 37:>                                                         (0 + 1) / 1]

+---------+--------------------+---------+----------+-------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+--------------------+-----------------+-------------------+-------------------------+------------------+--------------------+----------------------+------------------------------+-------------------------------------------+--------------------------------------------+-------------------------------------------+
|  host_id|            host_url|host_name|host_since|host_location|          host_about|  host_thumbnail_url|    host_picture_url|host_neighbourhood|host_response_time|host_response_rate|host_acceptance_rate|host_is_superhost|host_listings_count|host_total_listings_count|host_verifications|host_has_profile_pic|host_identity_verified|calculated_host_listings_count|calculated_host_listings_count_entire_homes|calculated_host_listings_count_private_rooms|calculated_host_listings_count_shared_rooms|


                                                                                

### Host Dimension Tables

In [21]:
from pyspark.sql.functions import monotonically_increasing_id
# Host Qualifications and Diagnostics (HQAD)
hqad_df = host_df.select(
    "host_response_time",
    "host_response_rate",
    "host_acceptance_rate",
    "host_is_superhost",
    "host_listings_count",
    "host_total_listings_count",
    "host_verifications",
    "host_has_profile_pic",
    "host_identity_verified"
)

hqad_df = hqad_df.dropDuplicates()

hqad_df = hqad_df.withColumn("hqad_id", monotonically_increasing_id())

print("count: " + str(hqad_df.count()))
hqad_df.printSchema()
hqad_df.show(2)

                                                                                

count: 607
root
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: decimal(3,2) (nullable = true)
 |-- host_acceptance_rate: decimal(3,2) (nullable = true)
 |-- host_is_superhost: integer (nullable = true)
 |-- host_listings_count: integer (nullable = true)
 |-- host_total_listings_count: integer (nullable = true)
 |-- host_verifications: string (nullable = true)
 |-- host_has_profile_pic: integer (nullable = true)
 |-- host_identity_verified: integer (nullable = true)
 |-- hqad_id: long (nullable = false)



[Stage 46:>                                                         (0 + 1) / 1]

+------------------+------------------+--------------------+-----------------+-------------------+-------------------------+------------------+--------------------+----------------------+-------+
|host_response_time|host_response_rate|host_acceptance_rate|host_is_superhost|host_listings_count|host_total_listings_count|host_verifications|host_has_profile_pic|host_identity_verified|hqad_id|
+------------------+------------------+--------------------+-----------------+-------------------+-------------------------+------------------+--------------------+----------------------+-------+
|                 H|              1.00|                1.00|                1|                  2|                        4|               110|                   1|                     1|      0|
|                 H|              1.00|                1.00|                1|                  8|                       11|               111|                   1|                     1|      1|
+------------------+

                                                                                

In [22]:
# The Host Listings Diagnostics (HLD)
hld_df = host_df.select(
    "calculated_host_listings_count",
    "calculated_host_listings_count_entire_homes",
    "calculated_host_listings_count_private_rooms",
    "calculated_host_listings_count_shared_rooms"
)

hld_df = hld_df.dropDuplicates()

hld_df = hld_df.withColumn("hld_id", monotonically_increasing_id())

print("count: " + str(hld_df.count()))

hld_df.printSchema()
hld_df.show(2)

count: 69
root
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- calculated_host_listings_count_entire_homes: integer (nullable = true)
 |-- calculated_host_listings_count_private_rooms: integer (nullable = true)
 |-- calculated_host_listings_count_shared_rooms: integer (nullable = true)
 |-- hld_id: long (nullable = false)

+------------------------------+-------------------------------------------+--------------------------------------------+-------------------------------------------+------+
|calculated_host_listings_count|calculated_host_listings_count_entire_homes|calculated_host_listings_count_private_rooms|calculated_host_listings_count_shared_rooms|hld_id|
+------------------------------+-------------------------------------------+--------------------------------------------+-------------------------------------------+------+
|                             2|                                          1|                                           1|               

## Next, the Property Dimension Table

In [24]:
property_df = listing_df.select(
    "latitude",
    "longitude", 
    "property_type", 
    "room_type", 
    "accommodates",
    "bathrooms",
    "bathroom_desc",
    "beds",
    "price"
)

property_df = property_df.dropDuplicates()

property_df = property_df.withColumn("property_id", monotonically_increasing_id())

print("count: " + str(property_df.count()))

property_df.printSchema()
property_df.show(6)

                                                                                

count: 2649
root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- bathrooms: decimal(5,1) (nullable = true)
 |-- bathroom_desc: string (nullable = true)
 |-- beds: integer (nullable = true)
 |-- price: decimal(10,2) (nullable = true)
 |-- property_id: long (nullable = false)



[Stage 73:>                                                         (0 + 1) / 1]

+--------+---------+--------------------+---------------+------------+---------+-------------+----+------+-----------+
|latitude|longitude|       property_type|      room_type|accommodates|bathrooms|bathroom_desc|beds| price|property_id|
+--------+---------+--------------------+---------------+------------+---------+-------------+----+------+-----------+
|40.01257|-83.00159|Private room in h...|   Private room|           2|      3.0| shared baths|   1| 87.00|          0|
|39.95178|-82.98712|        Entire condo|Entire home/apt|           4|      1.0|         bath|   2| 80.00|          1|
|39.93729|-82.99499|         Entire home|Entire home/apt|           6|      1.5|        baths|   3|120.00|          2|
|39.99624|-83.00542|  Entire rental unit|Entire home/apt|           2|      1.0|         bath|   1| 58.00|          3|
|39.98581|-83.00603|  Entire rental unit|Entire home/apt|           2|      1.0|         bath|   1| 90.00|          4|
|39.94777|-82.95199|Private room in home|   Priv

                                                                                

## Now the Reviews Diagnostics Table

In [31]:
reviews_diagnostics_df = listing_df.select(
    "number_of_reviews",
    "number_of_reviews_ltm",
    "number_of_reviews_l30d",
    "first_review",
    "last_review",
    "review_scores_rating",
    "review_scores_accuracy",
    "review_scores_cleanliness",
    "review_scores_checkin",
    "review_scores_communication",
    "review_scores_location",
    "review_scores_value",
    "reviews_per_month"
)

reviews_diagnostics_df = reviews_diagnostics_df.dropDuplicates()

reviews_diagnostics_df = reviews_diagnostics_df.withColumn("rev_diag_id", monotonically_increasing_id())

print("count: " + str(reviews_diagnostics_df.count()))

reviews_diagnostics_df.printSchema()
reviews_diagnostics_df.show(5, truncate=False)

count: 2272
root
 |-- number_of_reviews: integer (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- number_of_reviews_l30d: integer (nullable = true)
 |-- first_review: date (nullable = true)
 |-- last_review: date (nullable = true)
 |-- review_scores_rating: double (nullable = true)
 |-- review_scores_accuracy: double (nullable = true)
 |-- review_scores_cleanliness: double (nullable = true)
 |-- review_scores_checkin: double (nullable = true)
 |-- review_scores_communication: double (nullable = true)
 |-- review_scores_location: double (nullable = true)
 |-- review_scores_value: double (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- rev_diag_id: long (nullable = false)

+-----------------+---------------------+----------------------+------------+-----------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----------------

## Scrapings Dimension Table

In [32]:
scrapings_df = listing_df.select(
    "scrape_id",
    "last_scraped",
    "source"
)

scrapings_df = scrapings_df.dropDuplicates()

scrapings_df = scrapings_df.withColumn("scraping_id", monotonically_increasing_id())

print("count: " + str(scrapings_df.count()))

scrapings_df.printSchema()
scrapings_df.show(5, truncate=False)

count: 3
root
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: date (nullable = true)
 |-- source: string (nullable = true)
 |-- scraping_id: long (nullable = false)

+--------------+------------+---------------+-----------+
|scrape_id     |last_scraped|source         |scraping_id|
+--------------+------------+---------------+-----------+
|20231225202549|2023-12-26  |previous scrape|0          |
|20231225202549|2023-12-26  |city scrape    |1          |
|20231225202549|2023-12-25  |city scrape    |2          |
+--------------+------------+---------------+-----------+



## Neighbourhood Dimension Table


In [33]:
neighbourhood_df = listing_df.select(
    "neighbourhood",
    "neighbourhood_overview",
    "neighbourhood_cleansed"
)

neighbourhood_df = neighbourhood_df.dropDuplicates()

neighbourhood_df = neighbourhood_df.withColumn("neighbourhood_id", monotonically_increasing_id())

print("count: " + str(neighbourhood_df.count()))

neighbourhood_df.printSchema()
neighbourhood_df.show(5)

count: 1299
root
 |-- neighbourhood: string (nullable = true)
 |-- neighbourhood_overview: string (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- neighbourhood_id: long (nullable = false)

+--------------------+----------------------+----------------------+----------------+
|       neighbourhood|neighbourhood_overview|neighbourhood_cleansed|neighbourhood_id|
+--------------------+----------------------+----------------------+----------------+
|Hilliard, Ohio, U...|  We're about a 15 ...|              Far West|               0|
|Columbus, Ohio, U...|  Situated in a his...|             Near East|               1|
|Columbus, Ohio, U...|  The Old North nei...|  Near North/Univer...|               2|
|Columbus, Ohio, U...|  The neighborhood ...|          North Linden|               3|
|Columbus, Ohio, U...|  You are in the Cr...|        West Olentangy|               4|
+--------------------+----------------------+----------------------+----------------+
only sho

## MinMax Insights Dimension Table

In [34]:
minmax_insights_df = listing_df.select(
    "maximum_nights",
    "minimum_nights",
    "minimum_minimum_nights",
    "maximum_minimum_nights",
    "minimum_maximum_nights",
    "maximum_maximum_nights",
    "minimum_nights_avg_ntm",
    "maximum_nights_avg_ntm"
)

minmax_insights_df = minmax_insights_df.dropDuplicates()

minmax_insights_df = minmax_insights_df.withColumn("minmax_insights_id", monotonically_increasing_id())

print("count: " + str(minmax_insights_df.count()))

minmax_insights_df.printSchema()
minmax_insights_df.show(5, truncate=False)

count: 795
root
 |-- maximum_nights: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- minimum_minimum_nights: integer (nullable = true)
 |-- maximum_minimum_nights: integer (nullable = true)
 |-- minimum_maximum_nights: integer (nullable = true)
 |-- maximum_maximum_nights: integer (nullable = true)
 |-- minimum_nights_avg_ntm: double (nullable = true)
 |-- maximum_nights_avg_ntm: double (nullable = true)
 |-- minmax_insights_id: long (nullable = false)

+--------------+--------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+------------------+
|maximum_nights|minimum_nights|minimum_minimum_nights|maximum_minimum_nights|minimum_maximum_nights|maximum_maximum_nights|minimum_nights_avg_ntm|maximum_nights_avg_ntm|minmax_insights_id|
+--------------+--------------+----------------------+----------------------+----------------------+----------------------+-----------

## Availibility Dimension Table

In [35]:
availibility_df = listing_df.select(
    "has_availability",
    "availability_30",
    "availability_60",
    "availability_90",
    "availability_365"
)

availibility_df = availibility_df.dropDuplicates()

availibility_df = availibility_df.withColumn("avail_id", monotonically_increasing_id())

print("count: " + str(availibility_df.count()))

availibility_df.printSchema()
availibility_df.show(5, truncate=False)

count: 1336
root
 |-- has_availability: integer (nullable = true)
 |-- availability_30: integer (nullable = true)
 |-- availability_60: integer (nullable = true)
 |-- availability_90: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- avail_id: long (nullable = false)

+----------------+---------------+---------------+---------------+----------------+--------+
|has_availability|availability_30|availability_60|availability_90|availability_365|avail_id|
+----------------+---------------+---------------+---------------+----------------+--------+
|1               |1              |31             |61             |151             |0       |
|1               |0              |0              |0              |18              |1       |
|1               |22             |52             |82             |357             |2       |
|1               |23             |49             |73             |345             |3       |
|1               |0              |0              |

# Joining Tables Together

### Host Tables

In [36]:
hqad_host_conditions = [
    "host_response_time",
    "host_response_rate",
    "host_acceptance_rate",
    "host_is_superhost",
    "host_listings_count",
    "host_total_listings_count",
    "host_verifications",
    "host_has_profile_pic",
    "host_identity_verified"
]

hld_host_conditions = [
    "calculated_host_listings_count",
    "calculated_host_listings_count_entire_homes",
    "calculated_host_listings_count_private_rooms",
    "calculated_host_listings_count_shared_rooms"
]


host_df = host_df.join(hqad_df, on=hqad_host_conditions, how="left")\
                 .join(hld_df, on=hld_host_conditions, how="left")\
                 .select(
    "host_id",
    "host_url",
    "host_name",
    "host_since",
    "host_location",
    "host_about",
    "host_thumbnail_url",
    "host_picture_url",
    "host_neighbourhood",
    "hld_id",
    "hqad_id"
)

In [37]:
print("count: " + str(host_df.count()))

host_df.show(5)

count: 977
+---------+--------------------+---------+----------+-------------+--------------------+--------------------+--------------------+--------------------+------+-------+
|  host_id|            host_url|host_name|host_since|host_location|          host_about|  host_thumbnail_url|    host_picture_url|  host_neighbourhood|hld_id|hqad_id|
+---------+--------------------+---------+----------+-------------+--------------------+--------------------+--------------------+--------------------+------+-------+
|182105923|https://www.airbn...|    Holly|2018-04-02| Columbus, OH|Food, nature, art...|https://a0.muscac...|https://a0.muscac...|      Dennison Place|     2|    409|
| 56664331|https://www.airbn...|   Wesley|2016-01-29| Columbus, OH|Young traveler lo...|https://a0.muscac...|https://a0.muscac...|      Near East Side|     7|    467|
|  8325110|https://www.airbn...|     Tara|2013-08-22| Columbus, OH|Hello, worldwide ...|https://a0.muscac...|https://a0.muscac...|           Northland|   

## Final Join

In [38]:
property_join_conditions = [
    "latitude",
    "longitude",
    "price",
    "beds",
    "bathroom_desc",
    "accommodates"
]

reviews_join_conditions = [
    "number_of_reviews",
    "number_of_reviews_ltm",
    "number_of_reviews_l30d",
    "first_review",
    "last_review",
    "review_scores_rating",
    "review_scores_accuracy",
    "review_scores_cleanliness",
    "review_scores_checkin",
    "review_scores_communication",
    "review_scores_location",
    "review_scores_value",
    "reviews_per_month"
]

minmax_join_conditions = [
    "minimum_nights",
    "maximum_nights",
    "minimum_minimum_nights",
    "maximum_minimum_nights",
    "minimum_maximum_nights",
    "maximum_maximum_nights",
    "minimum_nights_avg_ntm",
    "maximum_nights_avg_ntm"
]



final_df = listing_df.join(host_df, on="host_id", how="left")\
                     .join(property_df, on=property_join_conditions, how="left")\
                     .join(reviews_diagnostics_df, on=reviews_join_conditions, how="left")\
                     .join(scrapings_df, on=["last_scraped", "source"], how="left")\
                     .join(neighbourhood_df, on=["neighbourhood_overview", "neighbourhood_cleansed"], how="left")\
                     .join(minmax_insights_df, on=minmax_join_conditions, how="left")\
                     .join(availibility_df, on=["has_availability", "availability_30", "availability_60", "availability_90", "availability_365"], how="left")\
                     .select(
                         "id",
                         "scraping_id",
                         "host_id",
                         "neighbourhood_id",
                         "property_id",
                         "minmax_insights_id",
                         "avail_id",
                         "rev_diag_id",
                         "listing_url",
                         "name",
                         "picture_url",
                         "license",
                         "instant_bookable"
                     )

In [39]:
final_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- scraping_id: long (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- neighbourhood_id: long (nullable = true)
 |-- property_id: long (nullable = true)
 |-- minmax_insights_id: long (nullable = true)
 |-- avail_id: long (nullable = true)
 |-- rev_diag_id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- name: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- license: string (nullable = true)
 |-- instant_bookable: integer (nullable = true)



In [40]:
host_df = host_df.withColumn("host_id", col("host_id").cast("int"))

host_df.printSchema()

root
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- hld_id: long (nullable = true)
 |-- hqad_id: long (nullable = true)



### Testing Glue Transformation Script

In [7]:
# Imporing libraries
import sys
from pyspark.context import SparkContext
from pyspark.sql.functions import udf, col, split, monotonically_increasing_id
from pyspark.sql.types import (
    StringType,
    IntegerType,
    DecimalType,
    LongType,
    DoubleType,
    DateType,
)

# from pyspark.errors import AnalysisException # Most common error for our script, but AWS Glue does not support it
from pyspark.sql.types import StructField, StructType

# UDF to transform host_verifications to a 3 character string
# index 0 = email, index 1 = phone, index 2 = work_email. 1 = verified, 0 = not verified
def hostQualifications_df_host_verifications_transform(verifs_list):
    res = ""

    verifs = ["email", "phone", "work_email"]

    for verif in verifs:
        if verif in verifs_list:
            res += "1"
        else:
            res += "0"

    return res


# UDF To convert t/f to 1/0 respectively
def t_f_to_1_0(t_f):
    if t_f == "t":
        return 1
    else:
        return 0

print("Transforming Spark DataFrame...")
# Transforming Listings Table

# Dropping Redundant/Empty Columns
columns_to_remove = [
    "calendar_last_scraped",
    "description",
    "calendar_updated",
    "bedrooms",
    "bathrooms",
    "neighbourhood_group_cleansed",
    "amenities",
]

print("Dropping Redundant/Empty Columns...")

# Dropping Columns
listingsDf = listingsDf.drop(*columns_to_remove)

print("Dropped Columns")

print("Renaming Column...")
# Correcting Typo
listingsDf = listingsDf.withColumnRenamed(
    "neighborhood_overview", "neighbourhood_overview"
)

print("Renamed Column")

# Actual Transformation
listingsDf.createOrReplaceTempView("listings_df_view")

hostQualifications_array_transform = udf(
    hostQualifications_df_host_verifications_transform, StringType()
)

truefalse_to_10 = udf(t_f_to_1_0, IntegerType())

listingDfQuery = """
                 SELECT id as listing_id,
                        host_id,
                        host_url,
                        host_name,
                        host_since,
                        host_location,
                        host_about,
                        host_thumbnail_url,
                        host_picture_url,
                        host_neighbourhood,
                        CASE host_response_time
                            WHEN 'within an hour' THEN 'H'
                            WHEN 'within a few hours' THEN 'FH'
                            WHEN 'within a dat' THEN 'D'
                            WHEN 'a few days or more' THEN 'D+'
                            ELSE NULL
                        END AS host_response_time, 
                        REPLACE(host_response_rate, '%', '') / 100 AS host_response_rate,
                        REPLACE(host_acceptance_rate, '%', '') / 100 AS host_acceptance_rate,
                        host_is_superhost,
                        host_listings_count, 
                        host_total_listings_count, 
                        host_verifications, 
                        host_has_profile_pic,
                        host_identity_verified,
                        calculated_host_listings_count,
                        calculated_host_listings_count_entire_homes,
                        calculated_host_listings_count_private_rooms,
                        calculated_host_listings_count_shared_rooms,
                        latitude,
                        longitude,
                        property_type,
                        room_type,
                        accommodates,
                        CASE
                            WHEN INSTR(bathrooms_text, 'Private half-bath') > 0 THEN '0.5 private bath'
                            WHEN INSTR(bathrooms_text, 'Half-bath') > 0 THEN '0.5 bath'
                            ELSE bathrooms_text
                        END as bathrooms,
                        beds,
                        REPLACE(price, '$', '') as daily_price,
                        number_of_reviews,
                        number_of_reviews_ltm,
                        number_of_reviews_l30d,
                        first_review,
                        last_review,
                        review_scores_rating,
                        review_scores_accuracy,
                        review_scores_cleanliness,
                        review_scores_checkin,
                        review_scores_communication,
                        review_scores_location,
                        review_scores_value,
                        reviews_per_month,
                        scrape_id,
                        last_scraped,
                        source,
                        neighbourhood,
                        neighbourhood_overview,
                        neighbourhood_cleansed,
                        maximum_nights,
                        minimum_nights,
                        minimum_minimum_nights,
                        maximum_minimum_nights,
                        minimum_maximum_nights,
                        maximum_maximum_nights,
                        minimum_nights_avg_ntm,
                        maximum_nights_avg_ntm,
                        has_availability,
                        availability_30,
                        availability_60,
                        availability_90,
                        availability_365,
                        listing_url,
                        name,
                        picture_url,
                        license,
                        instant_bookable
                    FROM listings_df_view
"""

try:
    listingDf = spark.sql(listingDfQuery)
except Exception as e:
    print(f"Error transforming listings table with SQL: {e}")
    # Keep note we don't have to stop the SparkContext, Glue manages that itself
    raise e

print("Transformed Listings Table With SQL")

# Splitting the bathroom column into two
property_baths_split = split(col("bathrooms"), " ", limit=2)

# Adding the two new columns
listingDf = listingDf.withColumn("bathroom_desc", property_baths_split.getItem(1))
listingDf = listingDf.withColumn(
    "bathrooms", property_baths_split.getItem(0).cast("decimal")
)

print("Casting columns to correct data types...")


# Casting columns to correct data types
listingDf = listingDf.withColumns(
    {
        "listing_id": col("listing_id").cast(LongType()),
        "host_id": col("host_id").cast(LongType()),
        "host_url": col("host_url").cast(StringType()),
        "host_name": col("host_name").cast(StringType()),
        "host_since": col("host_since").cast(DateType()),
        "host_location": col("host_location").cast(StringType()),
        "host_about": col("host_about").cast(StringType()),
        "host_thumbnail_url": col("host_thumbnail_url").cast(StringType()),
        "host_picture_url": col("host_picture_url").cast(StringType()),
        "host_neighbourhood": col("host_neighbourhood").cast(StringType()),
        "host_response_time": col("host_response_time").cast(StringType()),
        "host_response_rate": col("host_response_rate").cast(DecimalType(3, 2)),
        "host_acceptance_rate": col("host_acceptance_rate").cast(DecimalType(3, 2)),
        "host_is_superhost": col("host_is_superhost").cast(IntegerType()),
        "host_listings_count": col("host_listings_count").cast(IntegerType()),
        "host_total_listings_count": col("host_total_listings_count").cast(
            IntegerType()
        ),
        "host_verifications": col("host_verifications").cast(StringType()),
        "host_has_profile_pic": col("host_has_profile_pic").cast(IntegerType()),
        "host_listings_count": col("host_listings_count").cast(IntegerType()),
        "host_total_listings_count": col("host_total_listings_count").cast(
            IntegerType()
        ),
        "host_verifications": col("host_verifications").cast(StringType()),
        "host_has_profile_pic": col("host_has_profile_pic").cast(IntegerType()),
        "host_identity_verified": col("host_identity_verified").cast(IntegerType()),
        "calculated_host_listings_count": col("calculated_host_listings_count").cast(
            IntegerType()
        ),
        "calculated_host_listings_count_entire_homes": col(
            "calculated_host_listings_count_entire_homes"
        ).cast(IntegerType()),
        "calculated_host_listings_count_private_rooms": col(
            "calculated_host_listings_count_private_rooms"
        ).cast(IntegerType()),
        "calculated_host_listings_count_shared_rooms": col(
            "calculated_host_listings_count_shared_rooms"
        ).cast(IntegerType()),
        "latitude": col("latitude").cast(DecimalType(18, 15)),
        "longitude": col("longitude").cast(DecimalType(18, 15)),
        "property_type": col("property_type").cast(StringType()),
        "room_type": col("room_type").cast(StringType()),
        "accommodates": col("accommodates").cast(IntegerType()),
        "bathrooms": col("bathrooms").cast(DecimalType(5, 1)),
        "bathrooms_desc": col("bathroom_desc").cast(StringType()),
        "beds": col("beds").cast(IntegerType()),
        "daily_price": col("daily_price").cast(DecimalType(10, 2)),
        "number_of_reviews": col("number_of_reviews").cast(IntegerType()),
        "number_of_reviews_ltm": col("number_of_reviews_ltm").cast(IntegerType()),
        "number_of_reviews_l30d": col("number_of_reviews_l30d").cast(IntegerType()),
        "first_review": col("first_review").cast(DateType()),
        "last_review": col("last_review").cast(DateType()),
        "review_scores_rating": col("review_scores_rating").cast(DecimalType(3, 2)),
        "review_scores_accuracy": col("review_scores_accuracy").cast(DecimalType(3, 2)),
        "review_scores_cleanliness": col("review_scores_cleanliness").cast(
            DecimalType(3, 2)
        ),
        "review_scores_checkin": col("review_scores_checkin").cast(DecimalType(3, 2)),
        "review_scores_communication": col("review_scores_communication").cast(
            DecimalType(3, 2)
        ),
        "review_scores_location": col("review_scores_location").cast(DecimalType(3, 2)),
        "review_scores_value": col("review_scores_value").cast(DecimalType(3, 2)),
        "reviews_per_month": col("reviews_per_month").cast(DecimalType(3, 2)),
        "scrape_id": col("scrape_id").cast(LongType()),
        "last_scraped": col("last_scraped").cast(DateType()),
        "source": col("source").cast(StringType()),
        "neighbourhood": col("neighbourhood").cast(StringType()),
        "neighbourhood_overview": col("neighbourhood_overview").cast(StringType()),
        "neighbourhood_cleansed": col("neighbourhood_cleansed").cast(StringType()),
        "maximum_nights": col("maximum_nights").cast(IntegerType()),
        "minimum_nights": col("minimum_nights").cast(IntegerType()),
        "minimum_minimum_nights": col("minimum_minimum_nights").cast(IntegerType()),
        "maximum_minimum_nights": col("maximum_minimum_nights").cast(IntegerType()),
        "minimum_maximum_nights": col("minimum_maximum_nights").cast(IntegerType()),
        "maximum_maximum_nights": col("maximum_maximum_nights").cast(IntegerType()),
        "minimum_nights_avg_ntm": col("minimum_nights_avg_ntm").cast(IntegerType()),
        "maximum_nights_avg_ntm": col("maximum_nights_avg_ntm").cast(IntegerType()),
        "has_availability": col("has_availability").cast(IntegerType()),
        "availability_30": col("availability_30").cast(IntegerType()),
        "availability_60": col("availability_60").cast(IntegerType()),
        "availability_90": col("availability_90").cast(IntegerType()),
        "availability_365": col("availability_365").cast(IntegerType()),
        "listing_url": col("listing_url").cast(StringType()),
        "name": col("name").cast(StringType()),
        "picture_url": col("picture_url").cast(StringType()),
        "license": col("license").cast(StringType()),
        "instant_bookable": col("instant_bookable").cast(IntegerType()),
    }
)

print("Casted columns to correct data types")

print("Transforming Host Verifications...")
# Transforming Host Verifications
listingDf = listingDf.withColumn(
    "host_verifications", hostQualifications_array_transform(col("host_verifications"))
)

print("Transformed Host Verifications")

print("Transforming True/False to 1/0...")

# Transforming True/False to 1/0
listingDf = listingDf.withColumns(
    {
        "host_is_superhost": truefalse_to_10(col("host_is_superhost")),
        "host_has_profile_pic": truefalse_to_10(col("host_has_profile_pic")),
        "host_identity_verified": truefalse_to_10(col("host_identity_verified")),
        "has_availability": truefalse_to_10(col("has_availability")),
        "instant_bookable": truefalse_to_10(col("instant_bookable")),
    }
)

print("Transformed True/False to 1/0")

# Recreating Listings View
listingDf.createOrReplaceTempView("listing_df_view")

# Creating Tables for Each Dimension

# Host Table
hostDf = listingDf.select(
    "host_id",
    "host_url",
    "host_name",
    "host_since",
    "host_location",
    "host_about",
    "host_thumbnail_url",
    "host_picture_url",
    "host_neighbourhood",
    "host_response_time",
    "host_response_rate",
    "host_acceptance_rate",
    "host_is_superhost",
    "host_listings_count",
    "host_total_listings_count",
    "host_verifications",
    "host_has_profile_pic",
    "host_identity_verified",
    "calculated_host_listings_count",
    "calculated_host_listings_count_entire_homes",
    "calculated_host_listings_count_private_rooms",
    "calculated_host_listings_count_shared_rooms",
)

hostDf = hostDf.dropDuplicates()

# Host Dimensions
# hostQualificationsDf
hostQualificationsDf = hostDf.select(
    "host_response_time",
    "host_response_rate",
    "host_acceptance_rate",
    "host_is_superhost",
    "host_listings_count",
    "host_total_listings_count",
    "host_verifications",
    "host_has_profile_pic",
    "host_identity_verified",
)

hostQualificationsDf = hostQualificationsDf.dropDuplicates()

# Assigning ID
hostQualificationsDf = hostQualificationsDf.withColumn(
    "host_quals_diags_id", monotonically_increasing_id()
)

# hostListingsDiagsDf
hostListingsDiagsDf = hostDf.select(
    "calculated_host_listings_count",
    "calculated_host_listings_count_entire_homes",
    "calculated_host_listings_count_private_rooms",
    "calculated_host_listings_count_shared_rooms",
)

hostListingsDiagsDf = hostListingsDiagsDf.dropDuplicates()

# Assigning ID
hostListingsDiagsDf = hostListingsDiagsDf.withColumn(
    "host_listings_diags_id", monotonically_increasing_id()
)

# Property Table
propertyDf = listingDf.select(
    "latitude",
    "longitude",
    "property_type",
    "room_type",
    "accommodates",
    "bathroom_desc",
    "bathrooms",
    "beds",
    "daily_price",
)

propertyDf = propertyDf.dropDuplicates()

# Assigning ID
propertyDf = propertyDf.withColumn("property_id", monotonically_increasing_id())

# Reviews Table
reviewsDiagnosticsDf = listingDf.select(
    "number_of_reviews",
    "number_of_reviews_ltm",
    "number_of_reviews_l30d",
    "first_review",
    "last_review",
    "review_scores_rating",
    "review_scores_accuracy",
    "review_scores_cleanliness",
    "review_scores_checkin",
    "review_scores_communication",
    "review_scores_location",
    "review_scores_value",
    "reviews_per_month",
)

reviewsDiagnosticsDf = reviewsDiagnosticsDf.dropDuplicates()

# Assigning ID
reviewsDiagnosticsDf = reviewsDiagnosticsDf.withColumn(
    "rev_diag_id", monotonically_increasing_id()
)

# Scrapings Table
scrapingsDf = listingDf.select("scrape_id", "last_scraped", "source")

scrapingsDf = scrapingsDf.dropDuplicates()

# Assigning ID
scrapingsDf = scrapingsDf.withColumn("scraping_id", monotonically_increasing_id())

# Neighbourhood Table
neighbourhoodDf = listingDf.select(
    "neighbourhood", "neighbourhood_overview", "neighbourhood_cleansed"
)

neighbourhoodDf = neighbourhoodDf.dropDuplicates()

# Assigning ID
neighbourhoodDf = neighbourhoodDf.withColumn(
    "neighbourhood_id", monotonically_increasing_id()
)

# Min Max Insights Table
minMaxInsightsDf = listingDf.select(
    "minimum_nights",
    "maximum_nights",
    "minimum_minimum_nights",
    "maximum_minimum_nights",
    "minimum_maximum_nights",
    "maximum_maximum_nights",
    "minimum_nights_avg_ntm",
    "maximum_nights_avg_ntm",
)

minMaxInsightsDf = minMaxInsightsDf.dropDuplicates()

# Assigning ID
minMaxInsightsDf = minMaxInsightsDf.withColumn(
    "minmax_insights_id", monotonically_increasing_id()
)

# Availability Table
availabilityDf = listingDf.select(
    "has_availability",
    "availability_30",
    "availability_60",
    "availability_90",
    "availability_365",
)

availabilityDf = availabilityDf.dropDuplicates()

# Assigning ID
availabilityDf = availabilityDf.withColumn("avail_id", monotonically_increasing_id())

# Joining Tables

# Host Tables
hostQualifications_host_join_conditions = [
    "host_response_time",
    "host_response_rate",
    "host_acceptance_rate",
    "host_is_superhost",
    "host_listings_count",
    "host_total_listings_count",
    "host_verifications",
    "host_has_profile_pic",
    "host_identity_verified",
]

hostListingsDiags_host_join_conditions = [
    "calculated_host_listings_count",
    "calculated_host_listings_count_entire_homes",
    "calculated_host_listings_count_private_rooms",
    "calculated_host_listings_count_shared_rooms",
]

print("Validating Host Table Schemas...")

# Schema Validation for Host Tables

hostQualificationsSchema = StructType(
    [
        StructField("host_quals_diags_id", IntegerType(), False),
        StructField("host_response_time", StringType(), True),
        StructField("host_response_rate", StringType(), True),
        StructField("host_acceptance_rate", StringType(), True),
        StructField("host_is_superhost", StringType(), True),
        StructField("host_listings_count", IntegerType(), True),
        StructField("host_total_listings_count", IntegerType(), True),
        StructField("host_verifications", StringType(), True),
        StructField("host_has_profile_pic", StringType(), True),
        StructField("host_identity_verified", StringType(), True),
    ]
)

hostListingsDiagsSchema = StructType(
    [
        StructField("host_listings_diags_id", IntegerType(), False),
        StructField("calculated_host_listings_count", IntegerType(), True),
        StructField("calculated_host_listings_count_entire_homes", IntegerType(), True),
        StructField(
            "calculated_host_listings_count_private_rooms", IntegerType(), True
        ),
        StructField("calculated_host_listings_count_shared_rooms", IntegerType(), True),
    ]
)

# Schema Validation
try:
    hostQualificationsDf.schema == (hostQualificationsSchema)
    hostListingsDiagsDf.schema == (hostListingsDiagsSchema)

except Exception as e:
    print(f"Error validating host schema: {e}")
    raise e

print("Host Schemas Validated")

print("Joining Host Tables...")

try:
    hostDf = (
        hostDf.join(
            hostQualificationsDf, on=hostQualifications_host_join_conditions, how="left"
        )
        .join(
            hostListingsDiagsDf, on=hostListingsDiags_host_join_conditions, how="left"
        )
        .select(
            "host_id",
            "host_quals_diags_id",
            "host_listings_diags_id",
            "host_url",
            "host_name",
            "host_since",
            "host_location",
            "host_about",
            "host_thumbnail_url",
            "host_picture_url",
            "host_neighbourhood",
        )
    )
except Exception as e:  # Chance of error is unlikely, just in case I've put this
    print(f"Error: {e}")
    raise e

print("Host Tables Joined")

print("Validating Host Table Schema After Join...")
# Validating Host Table Schema After Join
hostSchema = StructType(
    [
        StructField(
            "host_id", LongType(), True
        ),  # Nullable is True because we did not run monotonically_increasing_id
        StructField("host_quals_diags_id", IntegerType(), True),
        StructField("host_listings_diags_id", IntegerType(), True),
        StructField("host_url", StringType(), True),
        StructField("host_name", StringType(), True),
        StructField("host_since", DateType(), True),
        StructField("host_location", StringType(), True),
        StructField("host_about", StringType(), True),
        StructField("host_thumbnail_url", StringType(), True),
        StructField("host_picture_url", StringType(), True),
        StructField("host_neighbourhood", StringType(), True),
    ]
)

try:
    hostDf.schema == (hostSchema)
except Exception as e:
    print(f"Error Validating Host Schema: {e}")
    raise e

print("Host Table Schema After Join Validated")

# Final Join
property_join_conditions = [
    "latitude",
    "longitude",
    "daily_price",
    "beds",
    "bathroom_desc",
    "accommodates",
]

reviews_join_conditions = [
    "number_of_reviews",
    "number_of_reviews_ltm",
    "number_of_reviews_l30d",
    "first_review",
    "last_review",
    "review_scores_rating",
    "review_scores_accuracy",
    "review_scores_cleanliness",
    "review_scores_checkin",
    "review_scores_communication",
    "review_scores_location",
    "review_scores_value",
    "reviews_per_month",
]

minmax_join_conditions = [
    "minimum_nights",
    "maximum_nights",
    "minimum_minimum_nights",
    "maximum_minimum_nights",
    "minimum_maximum_nights",
    "maximum_maximum_nights",
    "minimum_nights_avg_ntm",
    "maximum_nights_avg_ntm",
]

propertyDfSchema = StructType(
    [
        StructField("property_id", LongType(), False),
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True),
        StructField("property_type", StringType(), True),
        StructField("room_type", StringType(), True),
        StructField("accommodates", IntegerType(), True),
        StructField("bathrooms", DecimalType(), True),
        StructField("beds", IntegerType(), True),
        StructField("daily_price", DecimalType(10, 2), True),
    ]
)

reviewsDiagnosticsDfSchema = StructType(
    [
        StructField("reviewsDiagnostics_id", LongType(), False),
        StructField("number_of_reviews", IntegerType(), True),
        StructField("number_of_reviews_ltm", IntegerType(), True),
        StructField("number_of_reviews_l30d", IntegerType(), True),
        StructField("first_review", DateType(), True),
        StructField("last_review", DateType(), True),
        StructField("review_scores_rating", IntegerType(), True),
        StructField("review_scores_accuracy", IntegerType(), True),
        StructField("review_scores_cleanliness", IntegerType(), True),
        StructField("review_scores_checkin", IntegerType(), True),
        StructField("review_scores_communication", IntegerType(), True),
        StructField("review_scores_location", IntegerType(), True),
        StructField("review_scores_value", IntegerType(), True),
        StructField("reviews_per_month", DecimalType(10, 2), True),
    ]
)

scrapingsDfSchema = StructType(
    [
        StructField("scraping_id", LongType(), False),
        StructField("scrape_id", LongType(), True),
        StructField("last_scraped", DateType(), True),
        StructField("source", StringType(), True),
    ]
)

neighbourhoodDfSchema = StructType(
    [
        StructField("neighbourhood_id", LongType(), False),
        StructField("neighbourhood", StringType(), True),
        StructField("neighbourhood_overview", StringType(), True),
        StructField("neighbourhood_cleansed", StringType(), True),
    ]
)

minMaxInsightsDfSchema = StructType(
    [
        StructField("minmax_insights_id", LongType(), False),
        StructField("minimum_nights", IntegerType(), True),
        StructField("maximum_nights", IntegerType(), True),
        StructField("minimum_minimum_nights", IntegerType(), True),
        StructField("maximum_minimum_nights", IntegerType(), True),
        StructField("minimum_maximum_nights", IntegerType(), True),
        StructField("maximum_maximum_nights", IntegerType(), True),
        StructField("minimum_nights_avg_ntm", DoubleType(), True),
        StructField("maximum_nights_avg_ntm", DoubleType(), True),
    ]
)

availabilityDfSchema = StructType(
    [
        StructField("availability_id", LongType(), False),
        StructField("has_availability", IntegerType(), True),
        StructField("availability_30", IntegerType(), True),
        StructField("availability_60", IntegerType(), True),
        StructField("availability_90", IntegerType(), True),
        StructField("availability_365", IntegerType(), True),
    ]
)

print("Validating Schemas Before Final Join...")

try:
    propertyDf.schema == (propertyDfSchema)
    reviewsDiagnosticsDf.schema == (reviewsDiagnosticsDfSchema)
    scrapingsDf.schema == (scrapingsDfSchema)
    neighbourhoodDf.schema == (neighbourhoodDfSchema)
    minMaxInsightsDf.schema == (minMaxInsightsDfSchema)
    availabilityDf.schema == (availabilityDfSchema)
except Exception as e:
    print(f"Error Validating Schemas Before Final Join: {e}")
    raise e

print("Schemas Validated Before Final Join")

print("Joining Final Table...")

try:
    listingDf = (
        listingDf.join(hostDf, on="host_id", how="left")
        .join(propertyDf, on=property_join_conditions, how="left")
        .join(reviewsDiagnosticsDf, on=reviews_join_conditions, how="left")
        .join(scrapingsDf, on=["last_scraped", "source"], how="left")
        .join(
            neighbourhoodDf, on=["neighbourhood", "neighbourhood_cleansed"], how="left"
        )
        .join(minMaxInsightsDf, on=minmax_join_conditions, how="left")
        .join(
            availabilityDf,
            on=[
                "has_availability",
                "availability_30",
                "availability_60",
                "availability_90",
                "availability_365",
            ],
            how="left",
        )
        .select(
            "listing_id",
            "scraping_id",
            "host_id",
            "neighbourhood_id",
            "property_id",
            "minmax_insights_id",
            "avail_id",
            "rev_diag_id",
            "listing_url",
            "name",
            "picture_url",
            "license",
            "instant_bookable",
        )
    )
except Exception as e:  # Chance of error is unlikely, just in case I've put this
    print(f"Error {e}")
    raise e

print("Final Table Joined")

print("Validating Listings DataFrame After Join...")

listingDfSchema = StructType(
    [
        StructField(
            "listing_id", LongType(), False
        ),  # Nullable because we did not run monotonically_increasing_id
        StructField("scraping_id", LongType(), True),
        StructField("host_id", LongType(), True),
        StructField("neighbourhood_id", LongType(), True),
        StructField("property_id", LongType(), True),
        StructField("minmax_insights_id", LongType(), True),
        StructField("avail_id", LongType(), True),
        StructField("rev_diag_id", LongType(), True),
        StructField("listing_url", StringType(), True),
        StructField("name", StringType(), True),
        StructField("picture_url", StringType(), True),
        StructField("license", StringType(), True),
        StructField("instant_bookable", IntegerType(), True),
    ]
)

try:
    listingDf.schema == (listingDfSchema)
except Exception as e:
    print(f"Error Validating Listings Schema: {e}")
    raise e

print("Listings Schema Validated After Join")

print("Spark Transformations Complete")

Transforming Spark DataFrame...
Dropping Redundant/Empty Columns...
Dropped Columns
Renaming Column...
Renamed Column
Transformed Listings Table With SQL
Casting columns to correct data types...
Casted columns to correct data types
Transforming Host Verifications...
Transformed Host Verifications
Transforming True/False to 1/0...
Transformed True/False to 1/0
Validating Host Table Schemas...
Host Schemas Validated
Joining Host Tables...
Host Tables Joined
Validating Host Table Schema After Join...
Host Table Schema After Join Validated
Validating Schemas Before Final Join...
Schemas Validated Before Final Join
Joining Final Table...
Final Table Joined
Validating Listings DataFrame After Join...
Listings Schema Validated After Join
Spark Transformations Complete


In [7]:
listingDf.printSchema()
listingDf.show(5, truncate=False)

root
 |-- listing_id: long (nullable = true)
 |-- scraping_id: long (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- neighbourhood_id: long (nullable = true)
 |-- property_id: long (nullable = true)
 |-- minmax_insights_id: long (nullable = true)
 |-- avail_id: long (nullable = true)
 |-- rev_diag_id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- name: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- license: string (nullable = true)
 |-- instant_bookable: integer (nullable = true)



                                                                                

+----------+-----------+-------+----------------+-----------+------------------+--------+-----------+----------------------------------+--------------------------------------------------------+-------------------------------------------------------------------------+---------+----------------+
|listing_id|scraping_id|host_id|neighbourhood_id|property_id|minmax_insights_id|avail_id|rev_diag_id|listing_url                       |name                                                    |picture_url                                                              |license  |instant_bookable|
+----------+-----------+-------+----------------+-----------+------------------+--------+-----------+----------------------------------+--------------------------------------------------------+-------------------------------------------------------------------------+---------+----------------+
|90676     |1          |483306 |1273            |249        |750               |1149    |238        |https://www.ai

In [8]:
scrapingsDf.printSchema()
scrapingsDf.show(5, truncate=False)

+--------------+------------+---------------+-----------+
|scrape_id     |last_scraped|source         |scraping_id|
+--------------+------------+---------------+-----------+
|20231225202549|2023-12-26  |previous scrape|0          |
|20231225202549|2023-12-26  |city scrape    |1          |
|20231225202549|2023-12-25  |city scrape    |2          |
+--------------+------------+---------------+-----------+



In [28]:
from pyspark.sql.functions import max

def get_length_of_int(lat_long: float) -> int:
    return len(str(lat_long))

length_lat_long = udf(get_length_of_int, IntegerType())

propertyDf = propertyDf.withColumn("latitude_length", length_lat_long(col("latitude")))

propertyDf.printSchema()
propertyDf.select(max(col("latitude_length"))).show()

root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- bathroom_desc: string (nullable = true)
 |-- bathrooms: decimal(10,0) (nullable = true)
 |-- beds: integer (nullable = true)
 |-- daily_price: decimal(10,2) (nullable = true)
 |-- property_id: long (nullable = false)
 |-- latitude_length: integer (nullable = true)



[Stage 82:>                                                         (0 + 1) / 1]

+--------------------+
|max(latitude_length)|
+--------------------+
|                  18|
+--------------------+



                                                                                

In [30]:
propertyDf.select("latitude", "latitude_length").where(col("latitude_length") == 18).show()

[Stage 88:>                                                         (0 + 1) / 1]

+------------------+---------------+
|          latitude|latitude_length|
+------------------+---------------+
|39.973736679176035|             18|
|40.046764331669294|             18|
|39.948557402504136|             18|
|40.010394033061985|             18|
|39.940874264422106|             18|
|40.010227770717734|             18|
|39.988508956667054|             18|
|40.016180983197664|             18|
|39.954006053316405|             18|
|39.959799629509206|             18|
|40.021149788900324|             18|
|39.998215713209184|             18|
|39.985989173181075|             18|
|39.937669018611366|             18|
|39.936669149683226|             18|
|40.015793207281384|             18|
|40.015899658203125|             18|
|40.015937884834415|             18|
|39.968088942786764|             18|
|39.960582115566474|             18|
+------------------+---------------+
only showing top 20 rows



                                                                                

In [31]:
hostListingsDiagsDf.printSchema()

root
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- calculated_host_listings_count_entire_homes: integer (nullable = true)
 |-- calculated_host_listings_count_private_rooms: integer (nullable = true)
 |-- calculated_host_listings_count_shared_rooms: integer (nullable = true)
 |-- hostListingsDiags_id: long (nullable = false)

