In [1]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DoubleType, IntegerType
from pyspark.sql.functions import col, regexp_extract, expr, lower, split, explode, trim, col, regexp_replace, initcap, avg, when, count, udf

# Initialize SparkSession
spark = SparkSession.builder.appName("Preprocess").getOrCreate()

# Suppose your JSON data is stored in a file named 'data.json'
# Ensure that your JSON data is properly formatted and valid
with open('../data/raw/property_metadata.json', 'r') as file:
    data = json.load(file)

# Transform the data into a list of dictionaries
rows = []
for url, details in data.items():
    row = {'url': url}
    row.update(details)
    # Convert 'latitude' and 'longitude' to float or None if not found or 'Not found'
    row['latitude'] = (
        float(row['latitude']) if 'latitude' in row and row['latitude'] != 'Not found' else None
    )
    row['longitude'] = (
        float(row['longitude']) if 'longitude' in row and row['longitude'] != 'Not found' else None
    )
    
    rows.append(row)

# Define the schema with columns in the desired order, including latitude and longitude
schema = StructType([
    StructField('url', StringType(), True),
    StructField('name', StringType(), True),
    StructField('suburb', StringType(), True),
    StructField('cost_text', StringType(), True),
    StructField('rooms', ArrayType(StringType()), True),
    StructField('parking', ArrayType(StringType()), True),
    StructField('desc', StringType(), True),
    StructField('latitude', DoubleType(), True),
    StructField('longitude', DoubleType(), True),
])

# Create the DataFrame with the defined schema
df = spark.createDataFrame(rows, schema=schema)

# Show the DataFrame
df.show(truncate=False)

24/09/23 23:52:34 WARN Utils: Your hostname, Zacs-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 192.168.0.17 instead (on interface en0)
24/09/23 23:52:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/23 23:52:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-------------------------------------------------------------------------------------+---------------------------------------------------+------------+-------------------------+-----------------+-----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-----------+
|url                 

24/09/25 21:15:45 WARN TaskSetManager: Stage 133 contains a task of very large size (2308 KiB). The maximum recommended task size is 1000 KiB.


In [2]:
# Extract 'beds'
df = df.withColumn('beds_str', expr("filter(rooms, x -> x like '%Bed%')[0]"))
df = df.withColumn('beds', regexp_extract(col('beds_str'), '(\\d+)', 1).cast(IntegerType()))

# Extract 'baths'
df = df.withColumn('baths_str', expr("filter(rooms, x -> x like '%Bath%')[0]"))
df = df.withColumn('baths', regexp_extract(col('baths_str'), '(\\d+)', 1).cast(IntegerType()))

# Drop temporary columns
df = df.drop('beds_str', 'baths_str')

# Destructure the parking array to just a number
df = df.withColumn(
    'parking_space',
    when(col('parking').getItem(0) == '-', 0)  # Check for '-'
     .otherwise(regexp_extract(col('parking').getItem(0), r'(\d+)', 1).cast('int'))  # Extract the number
)

df = df.fillna({'parking_space': 0})

df = df.filter(~lower(col('desc')).contains('storage'))

# Filter out rows where both 'beds' and 'baths' are NULL
df = df.filter((col('beds').isNotNull()) | (col('baths').isNotNull()))

In [3]:
import folium
import pandas as pd

# Convert the PySpark DataFrame to Pandas
pandas_df = df.toPandas()

# Drop rows with NaN values in latitude or longitude
pandas_df = pandas_df.dropna(subset=['latitude', 'longitude'])

# Create a map centered around the mean latitude and longitude of the remaining points
map_center = [pandas_df['latitude'].mean(), pandas_df['longitude'].mean()]
my_map = folium.Map(location=map_center, zoom_start=50)

# Add markers for each property
for _, row in pandas_df.iterrows():
    folium.Marker(
        location=[row['latitude'], row['longitude']],
        popup=row['name'],
        icon=folium.Icon(color="blue", icon="info-sign")
    ).add_to(my_map)

# Display the map
# my_map

24/09/23 23:52:42 WARN TaskSetManager: Stage 1 contains a task of very large size (1485 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [4]:
# Define regex pattern to match weekly rent
weekly_rent_pattern = r"(?:\b\$?(?:\d{1,3}(?:,\d{3})*|\d+)(?:\.\d{2})?\s*(?:/)?\s*?(?:(?i)(per week.*|pw.*|weekly.*|p\.w.*|wk.*|p\.w\..*))|^\$?\d{1,3}(?:,\d{3})*(?:\.\d{2})?)$"

# Use regexp_extract to extract only rows with weekly rent information
df_filtered = df.withColumn("cost", regexp_extract(col("cost_text"), weekly_rent_pattern, 0))

# Extract only the numeric part
df_final = df_filtered.withColumn("cost", regexp_extract(col("cost"), r".*?\$?(\d{1,3}(?:,\d{3})*(\.\d+)?)", 1))

# Cast to float
df_final = df_final.withColumn("cost", regexp_replace(col("cost"), ',', ''))

df_final = df_final.withColumn("cost", col("cost").cast('float'))

# Filter out rows where the cost is not empty (valid matches)
df_valid_rent = df_final.filter((col("cost").isNotNull()) & (col("cost") <= 15000))

print(df_valid_rent.filter(col("suburb") == "Dandenong South").show(truncate=False))

24/09/23 23:52:45 WARN TaskSetManager: Stage 3 contains a task of very large size (1485 KiB). The maximum recommended task size is 1000 KiB.
24/09/23 23:52:46 WARN TaskSetManager: Stage 4 contains a task of very large size (1598 KiB). The maximum recommended task size is 1000 KiB.


+----------------------------------------------------------------------------+------------------------------------------+---------------+------------------+-----------------+-----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-----------+----+-----+-------------+-----+
|url        

In [5]:
df_valid_rent.select("suburb").distinct().show()

24/09/23 23:52:46 WARN TaskSetManager: Stage 5 contains a task of very large size (1485 KiB). The maximum recommended task size is 1000 KiB.


+--------------+
|        suburb|
+--------------+
|  Neerim South|
|  Ivanhoe East|
|         Lucas|
|        Eltham|
|   Templestowe|
|        Leneva|
|     Buninyong|
|      Cardigan|
|  Lake Gardens|
|       Rosanna|
|       Morwell|
|  Mirboo North|
|    Beechworth|
|      Beaufort|
|Ballarat North|
|       Macleod|
|     Newington|
|   Mount Helen|
| Cape Woolamai|
|  Sunset Strip|
+--------------+
only showing top 20 rows



In [6]:
# Load the CSV file into a DataFrame
mapping_sal = spark.read.csv('../data/raw/SAL_mapping.csv', header=True, inferSchema=True)

# Drop rows where 'SAL suburbs (gazetted localities)' or 'Rental suburbs' are null
mapping_sal = mapping_sal.na.drop(subset=['SAL suburbs (gazetted localities)', 'Rental suburbs'])

# Normalize delimiters in the 'SAL suburbs (gazetted localities)' column
mapping_sal = mapping_sal.withColumn(
    'SAL suburbs (gazetted localities)',
    regexp_replace(col('SAL suburbs (gazetted localities)'), r'[-–—]', ' - ')
)

# Function to capitalize and flatten the suburb variants
def flatten_suburbs(suburb_variants):
    return [suburb.title().strip() for suburb in suburb_variants.split(' - ')]

flatten_suburbs_udf = udf(flatten_suburbs, ArrayType(StringType()))

# Apply the UDF to create a new column
mapping_sal = mapping_sal.withColumn(
    'Standard_Suburb',
    flatten_suburbs_udf(col('SAL suburbs (gazetted localities)'))
)

# Explode the list into separate rows
mapping_sal = mapping_sal.withColumn('Standard_Suburb', explode(col('Standard_Suburb')))

# Keep only necessary columns (ensure column names match the CSV)
mapping_sal = mapping_sal.select('Rental suburbs', 'Standard_Suburb')

mapping_sal.show(90000, truncate=False)

# Inner join on Rental Suburbs and Standard_Suburb
joined_df = mapping_sal.join(df_valid_rent, 
                    mapping_sal['Standard_Suburb'] == df_valid_rent['suburb'],
                    how="inner")

joined_df = joined_df.drop('suburb')

joined_df.show(2, truncate=False)

print(joined_df.count())

                                                                                

+-------------------------------------+--------------------+
|Rental suburbs                       |Standard_Suburb     |
+-------------------------------------+--------------------+
|Albert Park-Middle Park-West St Kilda|Albert Park         |
|Albert Park-Middle Park-West St Kilda|Middle Park         |
|Albert Park-Middle Park-West St Kilda|St Kilda West       |
|Armadale                             |Armadale            |
|Carlton North                        |Carlton North       |
|Carlton North                        |Princes Hill        |
|Carlton-Parkville                    |Parkville           |
|Carlton-Parkville                    |Carlton             |
|CBD-St Kilda Rd                      |Melbourne Cbd       |
|Collingwood-Abbotsford               |Collingwood         |
|Collingwood-Abbotsford               |Abbotsford          |
|Docklands                            |Docklands           |
|East Melbourne                       |East Melbourne      |
|East St Kilda          

24/09/23 23:52:50 WARN TaskSetManager: Stage 14 contains a task of very large size (1485 KiB). The maximum recommended task size is 1000 KiB.


10726


In [7]:
aggregated_df = joined_df.groupBy("Standard_Suburb").agg(
    avg("beds").alias("average_beds"),
    avg("baths").alias("average_baths"),
    avg("parking_space").alias("average_parking"),
    avg("cost").alias("average_cost"),
    count("*").alias("counts")
)

# Show the result
aggregated_df.orderBy(aggregated_df['average_cost'].desc()).show(truncate=False)

24/09/23 23:52:52 WARN TaskSetManager: Stage 18 contains a task of very large size (1485 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+---------------+------------------+------------------+------------------+------------------+------+
|Standard_Suburb|average_beds      |average_baths     |average_parking   |average_cost      |counts|
+---------------+------------------+------------------+------------------+------------------+------+
|Echuca         |2.1666666666666665|1.3333333333333333|1.6666666666666667|108650.0          |6     |
|Warrnambool    |3.0               |1.368421052631579 |1.0526315789473684|38337.89473684211 |19    |
|Docklands      |1.6914285714285715|1.4057142857142857|0.7542857142857143|3276.325714285714 |175   |
|Boneo          |6.0               |5.0               |3.0               |2200.0            |1     |
|Bulla          |6.0               |2.0               |0.0               |1800.0            |1     |
|Pearcedale     |4.0               |2.0               |4.0               |1400.0            |1     |
|Footscray      |1.875968992248062 |1.372093023255814 |0.8372093023255814|1245.031007751938

In [8]:
print(aggregated_df.count())

24/09/23 23:52:53 WARN TaskSetManager: Stage 22 contains a task of very large size (1485 KiB). The maximum recommended task size is 1000 KiB.


434


In [9]:
aggregated_df.repartition(1).write.csv("../data/raw/2024_rental_suburbs.csv", header=True, mode="overwrite")

24/09/23 23:52:53 WARN TaskSetManager: Stage 29 contains a task of very large size (1485 KiB). The maximum recommended task size is 1000 KiB.
