# Data cleaning

Where I found the data and instructions on data elements (see documentation)
- https://valuation.property.nsw.gov.au/embed/propertySalesInformation

In [None]:
import pandas as pd
import numpy as np
import glob
import os
from pathlib import Path
from datetime import datetime
import warnings

warnings.filterwarnings('ignore')

In [None]:
DATA_DIR = "../data"
OUTPUT_DIR = "../data/parquet"
YEARS = range(2005, 2026)  # covering 2005-2025

os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"Data directory: {DATA_DIR}")
print(f"Output directory: {OUTPUT_DIR}")
print(f"Years to process: {list(YEARS)}")


Data directory: ../data
Output directory: ../data/parquet
Years to process: [2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024, 2025]


In [None]:
# checking out the file format - semicolon separated
sample_file = glob.glob(f"{DATA_DIR}/2024/*.DAT")[0]
print(f"Sample file: {sample_file}\n")

# peek at first 20 lines to see structure
with open(sample_file, 'r', encoding='latin-1') as f:
    lines = f.readlines()[:20]
    for i, line in enumerate(lines, 1):
        print(f"{i}: {line.strip()[:100]}")


Sample file: ../data/2024/081_SALES_DATA_NNME_30122024.DAT

1: A;RTSALEDATA;081;20241230 01:07;VALNET;
2: B;081;2940952;1;20241230 01:07;;;48;VALENTI CRES;KELLYVILLE;2155;679;M;20241031;20241219;2202000;R2;
3: C;081;2940952;1;20241230 01:07;106/1041473;
4: D;081;2940952;1;20241230 01:07;P;;;;;;
5: D;081;2940952;1;20241230 01:07;P;;;;;;
6: D;081;2940952;1;20241230 01:07;V;;;;;;
7: D;081;2940952;1;20241230 01:07;V;;;;;;
8: B;081;2987409;2;20241230 01:07;;;82;EDGEWATER DR;BELLA VISTA;2153;640;M;20240627;20241220;2805000;R2
9: C;081;2987409;2;20241230 01:07;1133/1046919;
10: D;081;2987409;2;20241230 01:07;P;;;;;;
11: D;081;2987409;2;20241230 01:07;P;;;;;;
12: D;081;2987409;2;20241230 01:07;V;;;;;;
13: B;081;3014211;3;20241230 01:07;VICTORIA GARDENS;15;11;HARRINGTON AVE;CASTLE HILL;2154;;;20241012;202
14: C;081;3014211;3;20241230 01:07;15/SP70002;
15: D;081;3014211;3;20241230 01:07;P;;;;;;
16: D;081;3014211;3;20241230 01:07;P;;;;;;
17: D;081;3014211;3;20241230 01:07;V;;;;;;
18: D;081;301421

In [None]:

# B records contain the main sales data - let's see what fields we have
sample_b_record = "B;001;2857799;1;20240101 01:07;;;176;LAKE RD;ELRINGTON;2325;25.15;H;20231219;20231222;1330000;RU2;R;RESIDENCE;;AAI;;0;AS334275;"
parts = sample_b_record.strip().split(';')
print("B Record Structure:")
print(f"Total columns: {len(parts)}")
for i, part in enumerate(parts):
    if part:  # skip empty fields
        print(f"  Column {i}: {part}")


B Record Structure:
Total columns: 25
  Column 0: B
  Column 1: 001
  Column 2: 2857799
  Column 3: 1
  Column 4: 20240101 01:07
  Column 7: 176
  Column 8: LAKE RD
  Column 9: ELRINGTON
  Column 10: 2325
  Column 11: 25.15
  Column 12: H
  Column 13: 20231219
  Column 14: 20231222
  Column 15: 1330000
  Column 16: RU2
  Column 17: R
  Column 18: RESIDENCE
  Column 20: AAI
  Column 22: 0
  Column 23: AS334275


In [None]:
def parse_dat_file(filepath):
    """
    Parses .DAT files and extracts B records (the actual sales data).
    Returns a list of dicts with parsed fields.
    """
    records = []
    
    try:
        with open(filepath, 'r', encoding='latin-1') as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith('A') or line.startswith('C') or line.startswith('D'):
                    continue
                
                if line.startswith('B'):
                    parts = line.split(';')
                    if len(parts) < 15:
                        continue
                    
                    try:
                        # extracting fields - keeping raw record for debugging if needed
                        record = {
                            'record_type': parts[0] if len(parts) > 0 else None,
                            'district_code': parts[1] if len(parts) > 1 else None,
                            'property_id': parts[2] if len(parts) > 2 else None,
                            'sale_counter': parts[3] if len(parts) > 3 else None,
                            'download_timestamp': parts[4] if len(parts) > 4 else None,
                            'property_name': parts[5] if len(parts) > 5 else None,
                            'property_unit_number': parts[6] if len(parts) > 6 else None,
                            'property_house_number': parts[7] if len(parts) > 7 else None,
                            'property_street_name': parts[8] if len(parts) > 8 else None,
                            'property_locality': parts[9] if len(parts) > 9 else None,
                            'property_post_code': parts[10] if len(parts) > 10 else None,
                            'area': parts[11] if len(parts) > 11 else None,
                            'area_type': parts[12] if len(parts) > 12 else None,
                            'contract_date': parts[13] if len(parts) > 13 else None,
                            'settlement_date': parts[14] if len(parts) > 14 else None,
                            'purchase_price': parts[15] if len(parts) > 15 else None,
                            'zoning': parts[16] if len(parts) > 16 else None,
                            'nature_of_property': parts[17] if len(parts) > 17 else None,
                            'primary_purpose': parts[18] if len(parts) > 18 else None,
                            'strata_lot_number': parts[19] if len(parts) > 19 else None,
                            'component_code': parts[20] if len(parts) > 20 else None,
                            'sale_code': parts[21] if len(parts) > 21 else None,
                            'percent_interest_of_sale': parts[22] if len(parts) > 22 else None,
                            'dealing_number': parts[23] if len(parts) > 23 else None,
                            'source_file': os.path.basename(filepath),
                            'raw_record': line
                        }
                        records.append(record)
                    except Exception as e:
                        # Skip malformed records
                        continue
    except Exception as e:
        print(f"Error reading {filepath}: {e}")
    
    return records


In [None]:
# quick test on one file
test_file = glob.glob(f"{DATA_DIR}/2024/*.DAT")[0]
test_records = parse_dat_file(test_file)
print(f"Parsed {len(test_records)} records from test file")
if test_records:
    print("\nFirst record:")
    for key, value in test_records[0].items():
        if key != 'raw_record':  # uncomment to see raw record if debugging
            print(f"  {key}: {value}")


Parsed 201 records from test file

First record:
  record_type: B
  district_code: 081
  property_id: 2940952
  sale_counter: 1
  download_timestamp: 20241230 01:07
  property_name: 
  property_unit_number: 
  property_house_number: 48
  property_street_name: VALENTI CRES
  property_locality: KELLYVILLE
  property_post_code: 2155
  area: 679
  area_type: M
  contract_date: 20241031
  settlement_date: 20241219
  purchase_price: 2202000
  zoning: R2
  nature_of_property: R
  primary_purpose: RESIDENCE
  strata_lot_number: 
  component_code: AYY
  sale_code: 
  percent_interest_of_sale: 0
  dealing_number: AU705676
  source_file: 081_SALES_DATA_NNME_30122024.DAT


In [None]:
def process_year(year, cache_mismatched=True):
    """
    Processes all .DAT files for a year, returns DataFrame and cached records.
    cache_df contains records with settlement dates from other years.
    """
    year_dir = f"{DATA_DIR}/{year}"
    
    if not os.path.exists(year_dir):
        print(f"  Year {year}: Directory not found")
        return None, None
    
    # grab all .DAT files (case insensitive)
    dat_files = glob.glob(f"{year_dir}/*.DAT") + glob.glob(f"{year_dir}/*.dat")
    
    if not dat_files:
        print(f"  Year {year}: No .DAT files found")
        return None, None
    
    print(f"  Year {year}: Found {len(dat_files)} .DAT file(s)")
    
    all_records = []
    for dat_file in dat_files:
        records = parse_dat_file(dat_file)
        all_records.extend(records)
    
    if not all_records:
        print(f"  Year {year}: No records extracted")
        return None, None
    
    df = pd.DataFrame(all_records)
    
    # date conversions
    if 'settlement_date' in df.columns:
        df['settlement_date'] = pd.to_datetime(
            df['settlement_date'], 
            format='%Y%m%d', 
            errors='coerce'
        )
    
    if 'contract_date' in df.columns:
        df['contract_date'] = pd.to_datetime(
            df['contract_date'], 
            format='%Y%m%d',  # CCYYMMDD format
            errors='coerce'
        )
    
    if 'purchase_price' in df.columns:
        df['purchase_price'] = pd.to_numeric(df['purchase_price'], errors='coerce')
    
    # split by settlement date year - some files have records from other years
    cache_df = None
    if 'settlement_date' in df.columns and cache_mismatched:
        initial_count = len(df)
        df_matched = df[df['settlement_date'].dt.year == year].copy()
        df_cache = df[df['settlement_date'].dt.year != year].copy()
        
        if len(df_cache) > 0:
            cache_df = df_cache
            print(f"  Year {year}: Cached {len(df_cache)} records with settlement dates from other years")
        
        df = df_matched
        matched_count = len(df)
        if initial_count != matched_count:
            print(f"  Year {year}: Matched {matched_count} records for {year}, cached {len(df_cache) if cache_df is not None else 0} for other years")
    
    if 'settlement_date' in df.columns and len(df) > 0:
        df = df.sort_values('settlement_date', na_position='last')
    
    print(f"  Year {year}: Extracted {len(df)} records")
    if 'settlement_date' in df.columns and len(df) > 0:
        print(f"  Year {year}: Date range: {df['settlement_date'].min()} to {df['settlement_date'].max()}")
    
    return df, cache_df


In [None]:
# noticed lots of records getting filtered out - checking what's happening
test_year = 2006
year_dir = f"{DATA_DIR}/{test_year}"
dat_files = glob.glob(f"{year_dir}/*.DAT") + glob.glob(f"{year_dir}/*.dat")

all_records = []
for dat_file in dat_files[:10]:  # sample first 10 files
    records = parse_dat_file(dat_file)
    all_records.extend(records)

df_diag = pd.DataFrame(all_records)
df_diag['settlement_date'] = pd.to_datetime(df_diag['settlement_date'], format='%Y%m%d', errors='coerce')

print("Settlement date year distribution (before filtering):")
print(df_diag['settlement_date'].dt.year.value_counts().sort_index())

print(f"\nTotal records: {len(df_diag)}")
print(f"Records with valid settlement_date: {df_diag['settlement_date'].notna().sum()}")
print(f"Records with invalid settlement_date: {df_diag['settlement_date'].isna().sum()}")

print("\nSample records that would be filtered out:")
filtered_out = df_diag[df_diag['settlement_date'].dt.year != test_year].head(5)
print(filtered_out[['settlement_date', 'property_locality', 'purchase_price', 'source_file']])

#! The result? We have many records added in the processed year from previous years. We must include these still in our dataset. My method is to cache filtered out records according to settlement date. After processing all our years with our filter in place, we check through our cache and merge these back into the year .parquet file they belong to. 

Settlement date year distribution (before filtering):
settlement_date
2002      1
2004      1
2005    121
2006    332
Name: count, dtype: int64

Total records: 455
Records with valid settlement_date: 455
Records with invalid settlement_date: 0

Sample records that would be filtered out:
   settlement_date property_locality purchase_price  \
2       2005-10-28             MANLY        1380000   
10      2005-09-01         FAIRLIGHT         385000   
27      2005-12-20             MANLY        1202000   
36      2005-12-23             MANLY         580546   
38      2005-09-23             MANLY          13000   

                         source_file  
2   086_SALES_DATA_NNME_01042006.DAT  
10  086_SALES_DATA_NNME_01042006.DAT  
27  086_SALES_DATA_NNME_01042006.DAT  
36  086_SALES_DATA_NNME_01042006.DAT  
38  086_SALES_DATA_NNME_01042006.DAT  


In [None]:
# testing on one year first - backup your data before running!
test_year = 2024
print(f"Testing with year {test_year}...")
test_df, test_cache = process_year(test_year, cache_mismatched=True)

if test_df is not None:
    print(f"\nDataFrame shape: {test_df.shape}")
    print(f"\nColumns: {list(test_df.columns)}")
    print(f"\nFirst few rows:")
    print(test_df.head())
    print(f"\nData types:")
    print(test_df.dtypes)
    print(f"\nMissing values:")
    print(test_df.isnull().sum())
    
if test_cache is not None:
    print(f"\nCached records shape: {test_cache.shape}")
    print(f"Cached records settlement year distribution:")
    print(test_cache['settlement_date'].dt.year.value_counts().sort_index())

Testing with year 2024...
  Year 2024: Found 6461 .DAT file(s)
  Year 2024: Cached 3743 records with settlement dates from other years
  Year 2024: Matched 213114 records for 2024, cached 3743 for other years
  Year 2024: Extracted 213114 records
  Year 2024: Date range: 2024-01-02 00:00:00 to 2024-12-24 00:00:00

DataFrame shape: (213114, 26)

Columns: ['record_type', 'district_code', 'property_id', 'sale_counter', 'download_timestamp', 'property_name', 'property_unit_number', 'property_house_number', 'property_street_name', 'property_locality', 'property_post_code', 'area', 'area_type', 'contract_date', 'settlement_date', 'purchase_price', 'zoning', 'nature_of_property', 'primary_purpose', 'strata_lot_number', 'component_code', 'sale_code', 'percent_interest_of_sale', 'dealing_number', 'source_file', 'raw_record']

First few rows:
      record_type district_code property_id sale_counter download_timestamp  \
46066           B           224     4523223           16     20240108 01:07 

In [None]:

# process all years and save to parquet
print("Processing all years...\n")

cached_records = []

for year in YEARS:
    df, cache_df = process_year(year, cache_mismatched=True)
    
    if cache_df is not None and len(cache_df) > 0:
        cached_records.append(cache_df)
    
    if df is not None:
        output_file = f"{OUTPUT_DIR}/{year}.parquet"
        try:
            df.to_parquet(output_file, index=False, engine='pyarrow')
        except Exception as e:
            try:
                df.to_parquet(output_file, index=False, engine='fastparquet')
                print(f"  Year {year}: Saved using fastparquet engine")
            except:
                df.to_parquet(output_file, index=False)
                print(f"  Year {year}: Saved using auto-detected engine")
        print(f"  Year {year}: Saved to {output_file}\n")
    else:
        print(f"  Year {year}: Skipped (no data)\n")

print("Done!")

# Merge cached records back into appropriate year files
#! There's a problem here (not too bad) - we have cached records from outside our given timeline (2005-2025). We should filter these out so we don't create .parquet files unnecessarily. Have not done this yet though... just manually deleted.

if cached_records:
    print(f"\nMerging {len(cached_records)} cached record groups back into appropriate year files...")
    
    # Combine all cached records
    all_cached = pd.concat(cached_records, ignore_index=True)
    print(f"Total cached records: {len(all_cached)}")
    
    # Group by settlement date year
    if 'settlement_date' in all_cached.columns:
        all_cached['settlement_year'] = all_cached['settlement_date'].dt.year
        
        # Process each year that has cached records
        for settlement_year in sorted(all_cached['settlement_year'].unique()):
            if pd.isna(settlement_year):
                continue
            
            settlement_year = int(settlement_year)
            year_records = all_cached[all_cached['settlement_year'] == settlement_year].copy()
            
            # Drop the temporary settlement_year column
            year_records = year_records.drop(columns=['settlement_year'])
            
            parquet_file = f"{OUTPUT_DIR}/{settlement_year}.parquet"
            
            if os.path.exists(parquet_file):
                # Load existing parquet file
                try:
                    existing_df = pd.read_parquet(parquet_file, engine='fastparquet')
                except:
                    existing_df = pd.read_parquet(parquet_file)
                # Merge with cached records
                merged_df = pd.concat([existing_df, year_records], ignore_index=True)
                # Sort by settlement_date
                if 'settlement_date' in merged_df.columns:
                    merged_df = merged_df.sort_values('settlement_date', na_position='last')
                # Save back
                try:
                    merged_df.to_parquet(parquet_file, index=False, engine='fastparquet')
                except:
                    merged_df.to_parquet(parquet_file, index=False)
                print(f"  Merged {len(year_records)} cached records into {settlement_year}.parquet (now {len(merged_df)} total)")
            else:
                # Create new file if it doesn't exist
                if 'settlement_date' in year_records.columns:
                    year_records = year_records.sort_values('settlement_date', na_position='last')
                try:
                    year_records.to_parquet(parquet_file, index=False, engine='fastparquet')
                except:
                    year_records.to_parquet(parquet_file, index=False)
                print(f"  Created {settlement_year}.parquet with {len(year_records)} cached records")
    
    print("\nCache merging complete!")
else:
    print("\nNo cached records to merge.")


Processing all years...

  Year 2005: Found 3671 .DAT file(s)
  Year 2005: Cached 24713 records with settlement dates from other years
  Year 2005: Matched 159407 records for 2005, cached 24713 for other years
  Year 2005: Extracted 159407 records
  Year 2005: Date range: 2005-01-01 00:00:00 to 2005-12-22 00:00:00
  Year 2005: Saved using fastparquet engine
  Year 2005: Saved to ../data/parquet/2005.parquet

  Year 2006: Found 3648 .DAT file(s)
  Year 2006: Cached 22446 records with settlement dates from other years
  Year 2006: Matched 162764 records for 2006, cached 22446 for other years
  Year 2006: Extracted 162764 records
  Year 2006: Date range: 2006-01-01 00:00:00 to 2006-12-18 00:00:00
  Year 2006: Saved using fastparquet engine
  Year 2006: Saved to ../data/parquet/2006.parquet

  Year 2007: Found 3633 .DAT file(s)
  Year 2007: Cached 21054 records with settlement dates from other years
  Year 2007: Matched 187111 records for 2007, cached 21054 for other years
  Year 2007: Ext

In [None]:
# quick summary stats
print("Summary Statistics:\n")

summary_data = []
for year in YEARS:
    parquet_file = f"{OUTPUT_DIR}/{year}.parquet"
    if os.path.exists(parquet_file):
        try:
            df = pd.read_parquet(parquet_file, engine='fastparquet')
        except:
            df = pd.read_parquet(parquet_file)
        summary_data.append({
            'year': year,
            'total_records': len(df),
            'unique_suburbs': df['property_locality'].nunique() if 'property_locality' in df.columns else None,
            'min_settlement_date': df['settlement_date'].min() if 'settlement_date' in df.columns else None,
            'max_settlement_date': df['settlement_date'].max() if 'settlement_date' in df.columns else None,
            'min_price': df['purchase_price'].min() if 'purchase_price' in df.columns else None,
            'max_price': df['purchase_price'].max() if 'purchase_price' in df.columns else None,
            'median_price': df['purchase_price'].median() if 'purchase_price' in df.columns else None,
        })

summary_df = pd.DataFrame(summary_data)
print(summary_df.to_string(index=False))

#! remember here that we have the whole NSW. We want to filter this down to just Sydney. We could do so with a suburb / district code list, or preferably a range (if sydney is say between 1000 and 3000 ya know.)

#! check out covid (2019)... so few records. 


Summary Statistics:

 year  total_records  unique_suburbs min_settlement_date max_settlement_date  min_price  max_price  median_price
 2005         181502            3470          2005-01-01          2005-12-31        100  710000000      360000.0
 2006         182983            3508          2006-01-01          2006-12-31        100  400000000      356500.0
 2007         209360            3499          2007-01-01          2007-12-31        100  411000000      370000.0
 2008         191223            3520          2008-01-01          2008-12-31        100  770000000      370000.0
 2009         200013            3523          2009-01-01          2009-12-31        100  137000000      370000.0
 2010         181513            3653          2010-01-01          2010-12-31        100  174000000      415000.0
 2011         172995            3649          2011-01-01          2011-12-31        100  395000000      418000.0
 2012         172434            3692          2012-01-01          2012-12-3

In [None]:
# checking for missing data
print("Data Quality Checks:\n")

for year in YEARS:
    parquet_file = f"{OUTPUT_DIR}/{year}.parquet"
    if os.path.exists(parquet_file):
        try:
            df = pd.read_parquet(parquet_file, engine='fastparquet')
        except:
            df = pd.read_parquet(parquet_file)
        
        missing_settlement = df['settlement_date'].isna().sum() if 'settlement_date' in df.columns else 0
        missing_price = df['purchase_price'].isna().sum() if 'purchase_price' in df.columns else 0
        missing_suburb = df['property_locality'].isna().sum() if 'property_locality' in df.columns else 0
        
        if missing_settlement > 0 or missing_price > 0 or missing_suburb > 0:
            print(f"Year {year}:")
            if missing_settlement > 0:
                print(f"  Missing settlement_date: {missing_settlement} ({missing_settlement/len(df)*100:.1f}%)")
            if missing_price > 0:
                print(f"  Missing purchase_price: {missing_price} ({missing_price/len(df)*100:.1f}%)")
            if missing_suburb > 0:
                print(f"  Missing property_locality: {missing_suburb} ({missing_suburb/len(df)*100:.1f}%)")
            print()
        else:
            print(f"Year {year}: No missing values")


Data Quality Checks:

Year 2005: No missing values
Year 2006: No missing values
Year 2007: No missing values
Year 2008: No missing values
Year 2009: No missing values
Year 2010: No missing values
Year 2011: No missing values
Year 2012: No missing values
Year 2013: No missing values
Year 2014: No missing values
Year 2015: No missing values
Year 2016: No missing values
Year 2017: No missing values
Year 2018: No missing values
Year 2019: No missing values
Year 2020: No missing values
Year 2021: No missing values
Year 2022: No missing values
Year 2023: No missing values
Year 2024: No missing values
Year 2025: No missing values
