## Import Library

In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, unix_timestamp
import matplotlib.pyplot as plt
import pandas as pd
import os

## Create Spark

In [3]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.driver.memory", "16g")
    .config('spark.executor.memory', '16g')
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

## Read Rent Data

In [91]:
# Read the Json file using multiLine option
df = spark.read.option("multiLine", True).json("../rent_info.json")

# Display the schema of the data
df.printSchema()

# Show the first few rows of the data without truncating the values
df.show(truncate=False)

# Count the number of rows
df.count()

root
 |-- addressParts: struct (nullable = true)
 |    |-- displayAddress: string (nullable = true)
 |    |-- displayType: string (nullable = true)
 |    |-- postcode: string (nullable = true)
 |    |-- stateAbbreviation: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- streetNumber: string (nullable = true)
 |    |-- suburb: string (nullable = true)
 |    |-- unitNumber: string (nullable = true)
 |-- advertiserIdentifiers: struct (nullable = true)
 |    |-- advertiserId: long (nullable = true)
 |    |-- advertiserType: string (nullable = true)
 |    |-- agentIds: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- contactIds: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |-- apmIdentifiers: struct (nullable = true)
 |    |-- suburbId: long (nullable = true)
 |-- bathrooms: long (nullable = true)
 |-- bedrooms: long (nullable = true)
 |-- buildingAreaSqm: double (nullable = true)
 |-- carspace

3277

In [92]:
# Check the number of null values for each column
null_counts = df.select([col(c).isNull().alias(c) for c in df.columns]) \
                .agg(*[sum(col(c).cast("int")).alias(c) for c in df.columns])

# Show the null counts for each column
null_counts.show()

+------------+---------------------+--------------+---------+--------+---------------+---------+-------+-------------+----------+-----------+-----------+----------------------+--------+-----------+--------+---+-----------------+----------------+-----------+-----+---------+------------+----------+-------------+-------------+--------+------+------+--------------+
|addressParts|advertiserIdentifiers|apmIdentifiers|bathrooms|bedrooms|buildingAreaSqm|carspaces|channel|dateAvailable|dateListed|dateUpdated|description|energyEfficiencyRating|features|geoLocation|headline| id|inspectionDetails|isNewDevelopment|landAreaSqm|media|objective|priceDetails|propertyId|propertyTypes|rentalDetails|saleMode|seoUrl|status|virtualTourUrl|
+------------+---------------------+--------------+---------+--------+---------------+---------+-------+-------------+----------+-----------+-----------+----------------------+--------+-----------+--------+---+-----------------+----------------+-----------+-----+---------

## Selected Features Needed

In [93]:
from pyspark.sql.functions import to_timestamp, year, month

# Convert the date columns using the format strings
df = df.withColumn("dateAvailable_timestamp", to_timestamp("dateAvailable", "yyyy-MM-dd'T'HH:mm:ssXXX"))
df = df.withColumn("dateListed_timestamp", to_timestamp("dateListed", "yyyy-MM-dd'T'HH:mm:ss'Z'"))
df = df.withColumn("dateUpdated_timestamp", to_timestamp("dateUpdated", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))

# Extract the year and month from the timestamps
df = df.withColumn("Available_year", year("dateAvailable_timestamp")) \
       .withColumn("Available_month", month("dateAvailable_timestamp")) \
       .withColumn("Listed_year", year("dateListed_timestamp")) \
       .withColumn("Listed_month", month("dateListed_timestamp")) \
       .withColumn("Updated_year", year("dateUpdated_timestamp")) \
       .withColumn("Updated_month", month("dateUpdated_timestamp"))

# Display the results
df.show()

+--------------------+---------------------+--------------+---------+--------+---------------+---------+-----------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------+--------------------+----------------+-----------+--------------------+---------+--------------------+----------+-------------------+--------------------+--------+--------------------+------+--------------+-----------------------+--------------------+---------------------+--------------+---------------+-----------+------------+------------+-------------+
|        addressParts|advertiserIdentifiers|apmIdentifiers|bathrooms|bedrooms|buildingAreaSqm|carspaces|    channel|       dateAvailable|          dateListed|         dateUpdated|         description|energyEfficiencyRating|            features|         geoLocation|            headline|      id|   inspectionDetails|isNewDevelopment|landAreaSqm|   

In [94]:
# Select the fields to retain
df_selected = df.select(
    "addressParts.suburb",              # Retain location information - suburb
    "addressParts.postcode",            # Retain location information - postcode
    "geoLocation.latitude",             # Retain location information - latitude
    "geoLocation.longitude",            # Retain location information - longitude
    "isNewDevelopment",                 # Retain whether it's a new development
    "bathrooms",                        # Retain property details - bathrooms
    "bedrooms",                         # Retain property details - bedrooms
    "carspaces",                        # Retain property details - carspaces
    "Available_year",                   # Retain date - Available_year
    "Available_month",                  # Retain date - Available_month
    "Listed_year",                      # Retain date - Listed_year
    "Listed_month",                     # Retain date - Listed_month
    "Updated_year",                     # Retain date - Updated_year
    "Updated_month",                    # Retain date - Updated_month
    "priceDetails.displayPrice",        # Retain the full price field - displayPrice
    "propertyTypes",                    # Retain property type
    "status",                           # Retain property status
    "channel"                           # Retain channel information
)

# Check the schema
df_selected.printSchema()

# Display the first few rows
df_selected.show(truncate=False)

# Count the number of rows
df.count()

root
 |-- suburb: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- isNewDevelopment: boolean (nullable = true)
 |-- bathrooms: long (nullable = true)
 |-- bedrooms: long (nullable = true)
 |-- carspaces: long (nullable = true)
 |-- Available_year: integer (nullable = true)
 |-- Available_month: integer (nullable = true)
 |-- Listed_year: integer (nullable = true)
 |-- Listed_month: integer (nullable = true)
 |-- Updated_year: integer (nullable = true)
 |-- Updated_month: integer (nullable = true)
 |-- displayPrice: string (nullable = true)
 |-- propertyTypes: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- status: string (nullable = true)
 |-- channel: string (nullable = true)

+----------------+--------+------------------+-----------+----------------+---------+--------+---------+--------------+---------------+-----------+------------+------------+---------

In [60]:
# show all price values
df_selected.select("displayPrice").show(truncate=False)

+-------------------+
|displayPrice       |
+-------------------+
|$1600pw            |
|$780.00            |
|$585 per week      |
|$475 pw            |
|$590 Per Week      |
|$480 Per Week      |
|$460               |
|$520 pw / $2260 pcm|
|$320.00            |
|$575 per week      |
|$620 per week      |
|$500 per week      |
|$670 weekly        |
|$470.00 per week   |
|$700 per week      |
|$370 per week      |
|$600.00            |
|$650               |
|$360               |
|$600               |
+-------------------+
only showing top 20 rows



In [95]:
from pyspark.sql.functions import regexp_extract, col, when
# Converting price to a unified weekly price
df_selected = df_selected.withColumn(
    "price_per_week",
    when(col("displayPrice").rlike(r"(\d+)\s*pw|(\d+)\s*per week"),  # If it's a weekly price, handle cases with "pw" or "per week"
         regexp_extract(col("displayPrice"), r"(\d+)", 1).cast("double"))  # Extract the weekly price part
    .when(col("displayPrice").rlike(r"(\d+)\s*pcm"),  # If it's a monthly price, handle cases with "pcm" and convert to weekly price
         regexp_extract(col("displayPrice"), r"(\d+)", 1).cast("double") / 4)
    # If no clear unit is present, assume it's a weekly price and extract the numeric part
    .otherwise( 
         regexp_extract(col("displayPrice"), r"(\d+)", 1).cast("double"))
)

# Display the cleaned price column
df_selected.select("displayPrice", "price_per_week").show(truncate=False)

# Count the number of rows
df_selected.count()

+-------------------+--------------+
|displayPrice       |price_per_week|
+-------------------+--------------+
|$1600pw            |1600.0        |
|$780.00            |780.0         |
|$585 per week      |585.0         |
|$475 pw            |475.0         |
|$590 Per Week      |590.0         |
|$480 Per Week      |480.0         |
|$460               |460.0         |
|$520 pw / $2260 pcm|520.0         |
|$320.00            |320.0         |
|$575 per week      |575.0         |
|$620 per week      |620.0         |
|$500 per week      |500.0         |
|$670 weekly        |670.0         |
|$470.00 per week   |470.0         |
|$700 per week      |700.0         |
|$370 per week      |370.0         |
|$600.00            |600.0         |
|$650               |650.0         |
|$360               |360.0         |
|$600               |600.0         |
+-------------------+--------------+
only showing top 20 rows



3277

In [96]:
# Delete displayPrice Column
df_selected = df_selected.drop("displayPrice")

df_selected.show(truncate=False)

df_selected.count()

+----------------+--------+------------------+-----------+----------------+---------+--------+---------+--------------+---------------+-----------+------------+------------+-------------+-------------------+------+-----------+--------------+
|suburb          |postcode|latitude          |longitude  |isNewDevelopment|bathrooms|bedrooms|carspaces|Available_year|Available_month|Listed_year|Listed_month|Updated_year|Updated_month|propertyTypes      |status|channel    |price_per_week|
+----------------+--------+------------------+-----------+----------------+---------+--------+---------+--------------+---------------+-----------+------------+------------+-------------+-------------------+------+-----------+--------------+
|Mount Martha    |3934    |-38.268818        |145.014034 |false           |3        |3       |2        |2024          |8              |2024       |8           |2024        |8            |[house]            |live  |residential|1600.0        |
|Bentleigh East  |3165    |-37.9

3277

In [97]:
from pyspark.sql.functions import col, sum

# Check the number of null values for each column
null_counts = df_selected.select([col(c).isNull().alias(c) for c in df_selected.columns]).agg(
    *[sum(col(c).cast("int")).alias(c) for c in df_selected.columns]
)

# Display the count of null values in each column
null_counts.show()

+------+--------+--------+---------+----------------+---------+--------+---------+--------------+---------------+-----------+------------+------------+-------------+-------------+------+-------+--------------+
|suburb|postcode|latitude|longitude|isNewDevelopment|bathrooms|bedrooms|carspaces|Available_year|Available_month|Listed_year|Listed_month|Updated_year|Updated_month|propertyTypes|status|channel|price_per_week|
+------+--------+--------+---------+----------------+---------+--------+---------+--------------+---------------+-----------+------------+------------+-------------+-------------+------+-------+--------------+
|     0|       0|       9|        9|               0|        0|       0|        0|            22|             22|          0|           0|          13|           13|            0|     0|      0|            88|
+------+--------+--------+---------+----------------+---------+--------+---------+--------------+---------------+-----------+------------+------------+---------

In [98]:
# Remove rows where latitude, longitude, or price_per_week are null
df_filtered = df_selected.filter(
    col("geoLocation.latitude").isNotNull() & 
    col("geoLocation.longitude").isNotNull() & 
    col("Available_year").isNotNull() &
    col("Available_month").isNotNull() &
    col("Updated_year").isNotNull() &
    col("Updated_month").isNotNull() &
    col("price_per_week").isNotNull()
)

# Display the processed DataFrame
df_filtered.show(truncate=False)

# Count the number of rows
df_filtered.count()

+----------------+--------+------------------+-----------+----------------+---------+--------+---------+--------------+---------------+-----------+------------+------------+-------------+-------------------+------+-----------+--------------+
|suburb          |postcode|latitude          |longitude  |isNewDevelopment|bathrooms|bedrooms|carspaces|Available_year|Available_month|Listed_year|Listed_month|Updated_year|Updated_month|propertyTypes      |status|channel    |price_per_week|
+----------------+--------+------------------+-----------+----------------+---------+--------+---------+--------------+---------------+-----------+------------+------------+-------------+-------------------+------+-----------+--------------+
|Mount Martha    |3934    |-38.268818        |145.014034 |false           |3        |3       |2        |2024          |8              |2024       |8           |2024        |8            |[house]            |live  |residential|1600.0        |
|Bentleigh East  |3165    |-37.9

3150

In [99]:
from pyspark.sql.functions import col, sum

# Check the number of null values for each column
null_counts = df_filtered.select([col(c).isNull().alias(c) for c in df_selected.columns]).agg(
    *[sum(col(c).cast("int")).alias(c) for c in df_selected.columns]
)
null_counts.show()

+------+--------+--------+---------+----------------+---------+--------+---------+--------------+---------------+-----------+------------+------------+-------------+-------------+------+-------+--------------+
|suburb|postcode|latitude|longitude|isNewDevelopment|bathrooms|bedrooms|carspaces|Available_year|Available_month|Listed_year|Listed_month|Updated_year|Updated_month|propertyTypes|status|channel|price_per_week|
+------+--------+--------+---------+----------------+---------+--------+---------+--------------+---------------+-----------+------------+------------+-------------+-------------+------+-------+--------------+
|     0|       0|       0|        0|               0|        0|       0|        0|             0|              0|          0|           0|           0|            0|            0|     0|      0|             0|
+------+--------+--------+---------+----------------+---------+--------+---------+--------------+---------------+-----------+------------+------------+---------

In [101]:
# View descriptive statistics for all columns
df_selected.describe().show()

+-------+----------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------------+-------------------+------------------+------+-----------+-----------------+
|summary|    suburb|          postcode|           latitude|         longitude|         bathrooms|          bedrooms|         carspaces|    Available_year|   Available_month|        Listed_year|     Listed_month|       Updated_year|     Updated_month|status|    channel|   price_per_week|
+-------+----------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------------+-------------------+------------------+------+-----------+-----------------+
|  count|      3277|              3277|               3268|              3268|              3277|              3277|              3277| 

In [102]:
# View descriptive statistics for the price_per_week column
df_selected.select("price_per_week").describe().show()

+-------+-----------------+
|summary|   price_per_week|
+-------+-----------------+
|  count|             3189|
|   mean|567.2202884916902|
| stddev|272.7248956836144|
|    min|              1.0|
|    max|           2999.0|
+-------+-----------------+



In [103]:
# Filter the price_per_week column to remove unreasonable values
df_filtered = df_filtered.filter((col("price_per_week") >= 200) & (col("price_per_week") <= 1800))

# View the descriptive statistics after filtering
df_filtered.select("price_per_week").describe().show()

# Count the number of rows after filtering
df_filtered.count()


+-------+-----------------+
|summary|   price_per_week|
+-------+-----------------+
|  count|             2913|
|   mean|599.5291795399932|
| stddev|192.2183749833224|
|    min|            200.0|
|    max|           1800.0|
+-------+-----------------+



2913