In [9]:
import polars as pl
import os
import sys
import json
from IPython.core.interactiveshell import InteractiveShell
from IPython.display import display_html
InteractiveShell.ast_node_interactivity = "all"

corelogic_dir = '../data/landing/corelogic/'
raw_corelogic_dir = '../data/raw/corelogic/'

In [2]:
def print_progress_bar(iteration, total, length=20, caption=None):
    """
    Display terminal progress bar.    
    args:
    iteration   - Required  : current iteration (Int)
    total       - Required  : total iterations (Int)
    length      - Optional  : character length of bar (Int)
    """
    percent = ("{0:.1f}").format(100 * (iteration / float(total)))
    filled_length = int(length * iteration // total)
    if iteration == total:
        arrow = '='
    else:
        arrow = '>'
    bar = '=' * filled_length + arrow + ' ' * (length - filled_length - 1)
    sys.stdout.write(f'\r[{bar}] {percent}% {iteration}/{total} {caption if caption else ""}')
    sys.stdout.flush()
    if iteration == total:
        print()

In [3]:
def get_first_valid(data, *keys):
    '''
    Get the first valid value from a list of keys in a dictionary
    '''
    for key in keys:
        try:
            value = eval(f"data{key}")
            if value is not None:
                return value
        except (KeyError, TypeError):
            continue
    return None

In [4]:
def read_json_file(file_path):
    '''
    Read a JSON file and extract relevant information into a dictionary    
    '''
    with open(file_path, 'r') as file:
        data = json.load(file)

        # ('unique_id', Int64),
        # ('street_name', String),
        # ('state', String),
        # ('postcode', Int64),
        # ('suburb', String),
        # ('property_type', String),
        # ('address', String),
        # ('rent', Float64),
        # ('num_bedrooms', Int64),
        # ('num_bathrooms', Int64),
        # ('parking', Int64),
        # ('distance_to_cbd', Float64),
        # ('sold_date', String),
        # ('latitude', Float64),
        # ('longitude', Float64)]

        listings = []

        


        unique_id = get_first_valid(data, f'["propertyId"]')
        name = get_first_valid(data, '["location"]["street"]["name"]') if get_first_valid(data, '["location"]["street"]["name"]') else ""              
        extension = get_first_valid(data, '["location"]["street"]["extension"]') if get_first_valid(data, '["location"]["street"]["extension"]') else ""
        street_name = f"{name} {extension}"
        state = get_first_valid(data, f'["location"]["state"]')
        postcode = get_first_valid(data, f'["location"]["postcode"]["name"]')
        suburb = get_first_valid(data, f'["location"]["locality"]["name"]')
        property_type = get_first_valid(data, f'["attrCore"]["propertyType"]')
        address = get_first_valid(data, f'["location"]["singleLine"]')
        
        num_bedrooms = get_first_valid(data, f'["attrCore"]["beds"]') 
        num_bathrooms = get_first_valid(data, f'["attrCore"]["baths"]')
        parking = get_first_valid(data, f'["attrCore"]["carSpaces"]')

        latitude = get_first_valid(data, f'["coordinates"]["latitude"]', f'["location"]["latitude"]')
        longitude = get_first_valid(data, f'["coordinates"]["longitude"]', f'["location"]["longitude"]')
        land_area = get_first_valid(data, f'["attrCore"]["landArea"]')
        floor_area = get_first_valid(data, f'["attrAdditional"]["floorArea"]')
        building_area = get_first_valid(data, f'["featureDetails"]["buildingArea"]')
        year_built = get_first_valid(data, f'["yearBuilt"]', '["attrAdditional"]["yearBuilt"]')
        num_visible_schools = len(get_first_valid(data, f'["localSchools"]')) if get_first_valid(data, f'["localSchools"]') else 0

        num_rental_listings = len(get_first_valid(data, f'["forRent"]')) if get_first_valid(data, f'["forRent"]') else 0
                
        
        if num_rental_listings == 0:
            model = {
                'unique_id': unique_id,
                'street_name': street_name,
                'state': state,
                'postcode': postcode,
                'suburb': suburb,
                'property_type': property_type,
                'address': address,
                'rent': None,
                'num_bedrooms': num_bedrooms,
                'num_bathrooms': num_bathrooms,
                'parking': parking,
                'sold_date': None,
                'latitude': latitude,
                'longitude': longitude,
                'land_area': land_area,
                'floor_area': floor_area,
                'building_area': building_area,
                'year_built': year_built,
                'num_visible_schools': num_visible_schools,
                'num_days_listed': None,
            }
            listings.append(model)
        else:
            for rental_listing in get_first_valid(data, f'["forRent"]'):
                model = {
                    'unique_id': unique_id,
                    'street_name': street_name,
                    'state': state,
                    'postcode': postcode,
                    'suburb': suburb,
                    'property_type': property_type,
                    'address': address,
                    'rent': get_first_valid(rental_listing, f'["rentPrice"]'),
                    'num_bedrooms': num_bedrooms,
                    'num_bathrooms': num_bathrooms,
                    'parking': parking,
                    'sold_date': get_first_valid(rental_listing, f'["dateOfListing"]', f'["timelineDate"]'),
                    'latitude': latitude,
                    'longitude': longitude,
                    'land_area': land_area,
                    'floor_area': floor_area,
                    'building_area': building_area,
                    'year_built': year_built,
                    'num_visible_schools': num_visible_schools,
                    'num_days_listed': get_first_valid(rental_listing, f'["daysOnMarket"]')
                }
                listings.append(model)
        return listings
        
        

        
        
def read_json_directory(directory_path):
    # List to hold all JSON data
    json_data_list = []
    num_iterations = 0
    # number of files in the directory
    total = len([name for name in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, name))])
    # Iterate through all files in the directory
    for filename in os.listdir(directory_path):
        if filename.endswith('.json'):
            file_path = os.path.join(directory_path, filename)
            json_list = read_json_file(file_path)
            for json_data in json_list:
                json_data_list.append(json_data)
            print_progress_bar(num_iterations, total, caption="properties processed")
            num_iterations += 1

    # Create a Polars DataFrame from the list of JSON data
    df = pl.DataFrame(json_data_list, infer_schema_length=200000)
    return df

In [5]:
# For example, we will take the first file in the directory
if not os.path.exists(corelogic_dir):
    print(f"Directory {corelogic_dir} does not exist")

else:
    df = read_json_directory(corelogic_dir)



In [10]:
# Write the DataFrame to a .parquet file in the raw directory
df.write_parquet(f"{raw_corelogic_dir}corelogic.parquet")

In [15]:
df.schema

Schema([('unique_id', Int64),
        ('street_name', String),
        ('state', String),
        ('postcode', String),
        ('suburb', String),
        ('property_type', String),
        ('address', String),
        ('rent', Int64),
        ('num_bedrooms', Int64),
        ('num_bathrooms', Int64),
        ('parking', Int64),
        ('sold_date', String),
        ('latitude', String),
        ('longitude', String),
        ('land_area', Int64),
        ('floor_area', Float64),
        ('building_area', Float64),
        ('year_built', String),
        ('num_visible_schools', Int64),
        ('num_days_listed', Int64)])