# FIT5202 2025 S2 Assignment 1 : Analysing Australian Property Market Data

## Table of Contents
* [Part 1 : Working with RDD](#part-1)  
    - [1.1 Data Preparation and Loading](#1.1)  
    - [1.2 Data Partitioning in RDD](#1.2)  
    - [1.3 Query/Analysis](#1.3)  
* [Part 2 : Working with DataFrames](#2-dataframes)  
    - [2.1 Data Preparation and Loading](#2-dataframes)  
    - [2.2 Query/Analysis](#2.2)  
* [Part 3 :  RDDs vs DataFrame vs Spark SQL](#part-3)  

Note: Feel free to add Code/Markdown cells as you need.

# Part 1 : Working with RDDs (30%) <a class="anchor" name="part-1"></a>
## 1.1 Working with RDD
In this section, you will need to create RDDs from the given datasets, perform partitioning in these RDDs and use various RDD operations to answer the queries. 

1.1.1 Data Preparation and Loading <a class="anchor" name="1.1"></a>
1.	Write the code to create a SparkContext object using SparkSession. To create a SparkSession, you first need to build a SparkConf object that contains information about your application. Use Melbourne time as the session timezone. Give your application an appropriate name and run Spark locally with 4 cores on your machine.

In [4]:
# ==== Part 1.1(1): SparkConf + SparkSession (Melbourne timezone, local[4]) ====
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext # Spark

master = "local[*]"
app_name = "FIT5202-A1"
spark_conf = SparkConf().setMaster(master).setAppName(app_name).set("spark.sql.session.timeZone", "Australia/Melbourne")
    
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext

print(spark)
print("Timezone:", spark.conf.get("spark.sql.session.timeZone"))

ConnectionRefusedError: [Errno 111] Connection refused

1.1.2 Load the CSV and JSON files into multiple RDDs. 

In [3]:
# Final lightweight & robust replacement for Part 1.1.(2)(3)(4)

import json
from pyspark import StorageLevel
from pyspark.sql import functions as F, types as T

csv_path = 'dataset/nsw_property_price.csv'
council_p = "dataset/council.json"
purpose_p = "dataset/property_purpose.json"
zoning_p  = "dataset/zoning.json"

# ---------- Spark tuning (safe defaults for big-ish CSV) ----------
spark.conf.set("spark.sql.shuffle.partitions", "200")  # adjust to your cluster size
spark.conf.set("spark.sql.files.maxPartitionBytes", str(128 * 1024 * 1024))  # 128MB splits

# ---------- Helpers ----------
def to_dict_str(row, cols):
    d = {}
    for c in cols:
        v = row[c] if isinstance(row, dict) else row[c]
        d[c] = (None if v is None else str(v))
    return d

def print_first_n_as_dicts(df, n, title):
    print(f"\n{title}")
    cols = df.columns
    # .limit(n).collect() is fine here; n is small
    for r in df.limit(n).collect():
        print(to_dict_str(r.asDict(recursive=True), cols))

def read_lookup_records(path):
    """Flatten the odd JSON shapes into a list[dict]."""
    with open(path, "r", encoding="utf-8") as f:
        obj = json.load(f)

    # 1) [ { "<header>": [ {..}, {..} ] } ]
    if isinstance(obj, list) and len(obj) == 1 and isinstance(obj[0], dict) and len(obj[0]) == 1:
        only_val = next(iter(obj[0].values()))
        if isinstance(only_val, list) and all(isinstance(x, dict) for x in only_val):
            return only_val

    # 2) [ {..}, {..} ]
    if isinstance(obj, list) and all(isinstance(x, dict) for x in obj):
        return obj

    # 3) { "<something>": [ {..}, {..} ] }  OR  {..single record..}
    if isinstance(obj, dict):
        for v in obj.values():
            if isinstance(v, list) and all(isinstance(x, dict) for x in v):
                return v
        return [obj]

    return []

def bad_id(colname):
    # invalid if null OR empty string after trim OR literal "0"
    col_s = F.col(colname).cast("string")
    return col_s.isNull() | (F.trim(col_s) == "") | (col_s == "0")

def safe_count(df):
    """Try df.count(); if the JVM grumbles, do a memory-thrifty RDD scan."""
    try:
        return df.count()
    except Exception:
        # Fallback: count via RDD mapPartitions (avoids some Catalyst overhead paths)
        return df.rdd.mapPartitions(lambda it: [sum(1 for _ in it)]).sum()

# ---------- Read CSV (robust, typed-but-forgiving) ----------
schema = T.StructType([
    T.StructField("property_id",        T.StringType(), True),
    T.StructField("purchase_price",     T.StringType(), True),  # keep as string to preserve exact text/scale
    T.StructField("address",            T.StringType(), True),
    T.StructField("post_code",          T.StringType(), True),  # keep as string to preserve leading zeros
    T.StructField("property_type",      T.StringType(), True),
    T.StructField("strata_lot_number",  T.StringType(), True),
    T.StructField("property_name",      T.StringType(), True),
    T.StructField("area",               T.StringType(), True),  # keep raw text to match your printed examples
    T.StructField("area_type",          T.StringType(), True),
    T.StructField("iso_contract_date",  T.StringType(), True),  # parse later if needed
    T.StructField("iso_settlement_date",T.StringType(), True),
    T.StructField("nature_of_property", T.StringType(), True),
    T.StructField("legal_description",  T.StringType(), True),
    T.StructField("id",                 T.StringType(), True),
    T.StructField("council_id",         T.StringType(), True),
    T.StructField("purpose_id",         T.StringType(), True),
    T.StructField("zone_id",            T.StringType(), True),
])

df = (
    spark.read
    .option("header", True)
    .option("multiLine", True)
    .option("quote", '"')
    .option("escape", '"')
    .option("mode", "PERMISSIVE")
    .schema(schema)
    .csv(csv_path)
).cache()  # single cache we reuse a couple of times

row_count_raw = safe_count(df)
print("CSV rows (no header):", row_count_raw)
print_first_n_as_dicts(df, 8, "First 8 records (no cleaning):")

# ---------- Read & flatten JSON lookups ----------
council_recs = read_lookup_records(council_p)
purpose_recs = read_lookup_records(purpose_p)
zoning_recs  = read_lookup_records(zoning_p)

council_df = (spark.createDataFrame(council_recs)
              .select(F.col("council_id").cast("string").alias("council_id"),
                      F.col("council_name").cast("string").alias("council_name"))
              .dropDuplicates(["council_id"]))

purpose_df = (spark.createDataFrame(purpose_recs)
              .select(F.col("purpose_id").cast("string").alias("purpose_id"),
                      F.col("primary_purpose").cast("string").alias("primary_purpose"))
              .dropDuplicates(["purpose_id"]))

zoning_df = (spark.createDataFrame(zoning_recs)
             .select(F.col("zoning_id").cast("string").alias("zone_id"),
                     F.col("zoning").cast("string").alias("zoning"))
             .dropDuplicates(["zone_id"]))

print("\nCouncil records:", safe_count(council_df))
print("Purpose records:", safe_count(purpose_df))
print("Zoning records:",  safe_count(zoning_df))

print_first_n_as_dicts(council_df, 8, "First 8 council:")
print_first_n_as_dicts(purpose_df, 8, "First 8 purpose:")
print_first_n_as_dicts(zoning_df,  8, "First 8 zoning:")

# ---------- Clean invalid rows (null/empty/'0' purpose_id or council_id) ----------
# Build the predicate once to avoid Planner rework
valid_pred = ~(bad_id("purpose_id") | bad_id("council_id"))

clean_df = df.filter(valid_pred).cache()

clean_count = safe_count(clean_df)
print("\nCSV rows (after cleaning):", clean_count)
print_first_n_as_dicts(clean_df, 8, "First 8 cleaned records:")

# Free memory pressure early
df.unpersist()
clean_df.unpersist()
# (lookup DFs are small; no need to persist/unpersist them)


ConnectionRefusedError: [Errno 111] Connection refused

In [9]:
import json, os, csv
from pyspark import StorageLevel

csv_path = 'dataset/nsw_property_price.csv'
council_p = "dataset/council.json"
purpose_p = "dataset/property_purpose.json"
zoning_p  = "dataset/zoning.json"

# --- CSV -> RDD (headerless) ---
raw_csv_rdd = sc.textFile(csv_path)
header_line = raw_csv_rdd.first()
header_cols = next(csv.reader([header_line]))
name_to_idx = {c: i for i, c in enumerate(header_cols)}

def parse_csv_part(lines):
    reader = csv.reader(lines)
    for row in reader:
        yield row

csv_rows_no_header = raw_csv_rdd.filter(lambda x: x != header_line).mapPartitions(parse_csv_part)

def row_to_dict(row):
    try:
        return {c: (row[i] if i < len(row) else None) for c, i in name_to_idx.items()}
    except Exception as e:
        # Log the problematic row and skip it
        return None

csv_dict_rdd = csv_rows_no_header.map(row_to_dict).filter(lambda x: x is not None)

# Part 1.1(2)(3): show count + first 8 *after header removal* (before cleaning)
print("CSV rows (no header):", csv_dict_rdd.count())
print("\nFirst 8 records (no cleaning):")
for r in csv_dict_rdd.take(8):
    print(r)

# --- JSON -> RDDs (validate shapes) ---
def load_json_array(path, expect_key=None):
    with open(path, "r", encoding="utf-8") as f:
        obj = json.load(f)
    if isinstance(obj, dict) and expect_key:
        data = obj.get(expect_key, [])
    elif isinstance(obj, list):
        data = obj
    else:
        # fallback: if single dict wrap as list
        data = [obj]
    return sc.parallelize(data)

council_rdd = load_json_array(council_p, expect_key=None).persist()
purpose_rdd = load_json_array(purpose_p, expect_key=None).persist()
zoning_rdd  = load_json_array(zoning_p,  expect_key=None).persist()

print("\nCouncil records:", council_rdd.count())
print("Purpose records:", purpose_rdd.count())
print("Zoning records:",  zoning_rdd.count())

print("\nFirst 8 council:", council_rdd.take(8))
print("\nFirst 8 purpose:", purpose_rdd.take(8))
print("\nFirst 8 zoning:",  zoning_rdd.take(8))

# ==== Part 1.1(4): drop invalid rows (null/empty/'0' purpose_id or council_id), then display ====
def is_bad(v):
    return (v is None) or (str(v).strip() == "") or (str(v).strip() == "0")

def valid_row(d):
    return not (is_bad(d.get("purpose_id")) or is_bad(d.get("council_id")))

clean_csv_rdd = csv_dict_rdd.filter(valid_row).persist(StorageLevel.MEMORY_ONLY)

print("\nCSV rows (after cleaning):", clean_csv_rdd.count())
print("\nFirst 8 cleaned records:")
for r in clean_csv_rdd.take(8):
    print(r)

CSV rows (no header): 4854814

First 8 records (no cleaning):
{'property_id': '4270509', 'purchase_price': '1400000.00', 'address': '8 C NYARI RD, KENTHURST', 'post_code': '2156', 'property_type': 'house', 'strata_lot_number': '', 'property_name': '', 'area': '2.044', 'area_type': 'H', 'iso_contract_date': '2023-12-14', 'iso_settlement_date': '2024-02-14', 'nature_of_property': 'V', 'legal_description': '2/1229857', 'id': '142', 'council_id': '200', 'purpose_id': '9922', 'zone_id': '53'}
{'property_id': '4329326', 'purchase_price': '1105000.00', 'address': '82 CAMARERO ST, BOX HILL', 'post_code': '2765', 'property_type': 'house', 'strata_lot_number': '', 'property_name': '', 'area': '300.2', 'area_type': 'M', 'iso_contract_date': '2024-01-12', 'iso_settlement_date': '2024-02-09', 'nature_of_property': 'R', 'legal_description': '1119/1256791', 'id': '143', 'council_id': '200', 'purpose_id': '7071', 'zone_id': '41'}
{'property_id': '1864112', 'purchase_price': '55000.00', 'address': '321


First 8 zoning: [{'zoning_id, zoning\n': [{'zoning_id': 1, 'zoning': ''}, {'zoning_id': 2, 'zoning': 'A'}, {'zoning_id': 3, 'zoning': 'AGB'}, {'zoning_id': 4, 'zoning': 'B'}, {'zoning_id': 5, 'zoning': 'B1'}, {'zoning_id': 6, 'zoning': 'B2'}, {'zoning_id': 7, 'zoning': 'B3'}, {'zoning_id': 8, 'zoning': 'B4'}, {'zoning_id': 9, 'zoning': 'B5'}, {'zoning_id': 10, 'zoning': 'B6'}, {'zoning_id': 11, 'zoning': 'B7'}, {'zoning_id': 12, 'zoning': 'B8'}, {'zoning_id': 13, 'zoning': 'C'}, {'zoning_id': 14, 'zoning': 'C1'}, {'zoning_id': 15, 'zoning': 'C2'}, {'zoning_id': 16, 'zoning': 'C3'}, {'zoning_id': 17, 'zoning': 'C4'}, {'zoning_id': 18, 'zoning': 'D'}, {'zoning_id': 19, 'zoning': 'E'}, {'zoning_id': 20, 'zoning': 'E1'}, {'zoning_id': 21, 'zoning': 'E2'}, {'zoning_id': 22, 'zoning': 'E3'}, {'zoning_id': 23, 'zoning': 'E4'}, {'zoning_id': 24, 'zoning': 'E5'}, {'zoning_id': 25, 'zoning': 'EM'}, {'zoning_id': 26, 'zoning': 'ENT'}, {'zoning_id': 27, 'zoning': 'ENZ'}, {'zoning_id': 28, 'zoning

1.1.3 For each RDD, remove the header rows and display the total count and the first 8 records.


1.1.4 Drop records with invalid information: purpose_id or council_id is null, empty, or 0.

### 1.2 Data Partitioning in RDD <a class="anchor" name="1.2"></a>
1.2.1 For each RDD, using Spark’s default partitioning, print out the total number of partitions and the number of records in each partition

1.2.2 Answer the following questions:   
a) How many partitions do the above RDDs have?  
b) How is the data in these RDDs partitioned by default, when we do not explicitly specify any partitioning strategy? Can you explain why it is partitioned in this number?   
c) Assuming we are querying the dataset based on <strong> Property Price</strong>, can you think of a better strategy for partitioning the data based on your available hardware resources?  

Your answer for a

Your answer for b

Your answer for c

1.2.3 Create a user-defined function (UDF) to transform the date strings from ISO format (YYYY-MM-DD) (e.g. 2025-01-01) to Australian format (DD/Mon/YYYY) (e.g. 01/Jan/2025), then call the UDF to transform two date columns (iso_contract_date and iso_settlement_date) to contract_date and settlement_date.

### 1.3 Query/Analysis <a class="anchor" name="1.3"></a>
For this part, write relevant RDD operations to answer the following queries.

1.3.1 Extract the Month (Jan-Dec) information and print the total number of sales by contract date for each Month. (5%)

1.3.2 Which 5 councils have the largest number of houses? Show their name and the total number of houses. (Note: Each house may appear multiple times if there are more than one sales, you should only count them once.) (5%)

## Part 2. Working with DataFrames (45%) <a class="anchor" name="2-dataframes"></a>
In this section, you need to load the given datasets into PySpark DataFrames and use DataFrame functions to answer the queries.
### 2.1 Data Preparation and Loading

2.1.1. Load the CSV/JSON files into separate dataframes. When you create your dataframes, please refer to the metadata file and think about the appropriate data type for each column.

2.1.2 Display the schema of the dataframes.

When the dataset is large, do you need all columns? How to optimize memory usage? Do you need a customized data partitioning strategy? (Note: Think about those questions but you don’t need to answer these questions.)

### 2.2 QueryAnalysis  <a class="anchor" name="2.2"></a>
Implement the following queries using dataframes. You need to be able to perform operations like transforming, filtering, sorting, joining and group by using the functions provided by the DataFrame API. For each task, display the first 5 results where no output is specified.

2.2.1. The area column has two types: (H, A and M): 1 H is one hectare = 10000 sqm, 1A is one acre = 4000 sqm, 1 M is one sqm. Unify the unit to sqm and create a new column called area_sqm. 

2.2.2. <pre>The top five property types are: Residence, Vacant Land, Commercial, Farm and Industrial.
However, for historical reason, they may have different strings in the database. Please update the primary_purpose with the following rules:
a)	Any purpose that has “HOME”, “HOUSE”, “UNIT” is classified as “Residence”;
b)	“Warehouse”, “Factory”,  “INDUST” should be changed to “Industrial”;
c)	Anything that contains “FARM”(i.e. FARMING), should be changed to “FARM”;
d)	“Vacant”, “Land” should be “Vacant Land”;
e)	Anything that has “COMM”, “Retail”, “Shop” or “Office” are “Cmmercial”.
f)	All remaining properties, including null and empty purposes, are classified as “Others”.
Show the count of each type in a table.
(note: Some properties are multi-purpose, e.g. “House & Farm”, it’s fine to count them multiple times.)
</pre>

2.2.3 Find the top 20 properties that make the largest value gain, show their address, suburb, and value increased. To calculate the value gain, the property must have been sold multiple times, “value increase” can be calculated with the last sold price – first sold price, regardless the transactions in between. Print all 20 records.

2.2.4 For each season, plot the median house price trend over the years. Seasons in Australia are defined as: (Spring: Sep-Nov, Summer: Dec-Feb, Autumn: Mar-May, Winter: Jun-Aug). 

2.2.5 (Open Question) Explore the dataset freely and plot one diagram of your choice. Which columns (at least 2) are highly correlated to the sales price? Discuss the steps of your exploration and the results. (No word limit, please keep concise.) 

Write your dicsussion here.

### Part 3 RDDs vs DataFrame vs Spark SQL (25%) <a class="anchor" name="part-3"></a>
Implement the following complex queries using RDD, DataFrame in SparkSQL separately(choose two). Log the time taken for each query in each approach using the “%%time” built-in magic command in Jupyter Notebook and discuss the performance difference between these 2 approaches of your choice.
(notes: You can write a multi-step query or a single complex query, the choice is yours. You can reuse the data frame in Part 2.)

#### Complex Query:
<pre>
A property investor wants to understand whether the property price and the settlement date are correlated. Here is the conditions:
1)	The investor is only interested in the last 2 years of the dataset.
2)	The investor is looking at houses under $2 million.
3)	Perform a bucketing of the settlement date (settlement – contract date
range (15, 30, 45, 60, 90 days).
4)	Perform a bucketing of property prices in $500K(e.g. 0-$500K, $500K-$1M, $1M-$1.5M, $1.5-$2M)
5)	Count the number of transactions in each combination and print the result in the following format
(Note: It’s fine to count the same property multiple times in this task, it’s based on sales transactions).
(Note: You shall show the full table with 40 rows, 2 years *4 price bucket * 5 settlement bucket; 0 count should be displayed as 0, not omitted.)
</pre>

### a)	Implement the above query using two approaches of your choice separately and print the results. (Note: Outputs from both approaches of your choice are required, and the results should be the same.). 

#### 3.1. Implementation 1

#### 3.2. Implementation 2

### b)	Which one is easier to implement, in your opinion? Log the time taken for each query, and observe the query execution time, among DataFrame and SparkSQL, which is faster and why? Please include proper references. (Maximum 500 words.) 

### Some ideas on the comparison

Armbrust, M., Huai, Y., Liang, C., Xin, R., & Zaharia, M. (2015). Deep Dive into Spark SQL’s Catalyst Optimizer. Retrieved September 30, 2017, from https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

Damji, J. (2016). A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets. Retrieved September 28, 2017, from https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

Data Flair (2017a). Apache Spark RDD vs DataFrame vs DataSet. Retrieved September 28, 2017, from http://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset

Prakash, C. (2016). Apache Spark: RDD vs Dataframe vs Dataset. Retrieved September 28, 2017, from http://why-not-learn-something.blogspot.com.au/2016/07/apache-spark-rdd-vs-dataframe-vs-dataset.html

Xin, R., & Rosen, J. (2015). Project Tungsten: Bringing Apache Spark Closer to Bare Metal. Retrieved September 30, 2017, from https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html