In [None]:
# Census Data from:
# https://datapacks.censusdata.abs.gov.au/datapacks/
# 2016 Census Datapacks
# General Community Profile
# State Suburbs
# Aust

import os
from itertools import chain
from pyspark.sql.functions import array, col, create_map, explode, expr, length, lit, regexp_replace, row_number, struct, substring
from pyspark.sql.window import Window

storage_account_name = os.getenv('STORAGE_ACCOUNT_NAME')
storage_container_name = os.getenv('STORAGE_CONTAINER_NAME')
storage_sas_token = os.getenv('STORAGE_SAS_TOKEN')

wasbs_path = f'wasbs://{storage_container_name}@{storage_account_name}.blob.core.windows.net/'
spark.conf.set(f'fs.azure.sas.{storage_container_name}.{storage_account_name}.blob.core.windows.net', storage_sas_token)

special_columns = ['suburb', 'state']

def col_to_row(dataframe, columns):
    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in dataframe.dtypes if c not in columns))
    # Check if all columns are of the same data type
    assert len(set(dtypes)) == 1, 'All columns must be of the same data type'
    # Create and explode an array of (column_name: key, column_value: value) structs
    keys_values = explode(array([struct(lit(c).alias('category'), col(c).alias('number')) for c in cols])).alias('keys_values')
    # Return updated dataframe
    return dataframe.select(columns + [keys_values]).select(columns + ['keys_values.category', 'keys_values.number'])


In [None]:
# State and Suburbs Data from:
# https://www.abs.gov.au/AUSSTATS/abs@.nsf/DetailsPage/1270.0.55.003July%202016?OpenDocument
# State Suburbs ASGS Edition 2016 in .csv Format
# File:
# SSC_2016_AUST.csv

ssc_df = spark.read.csv(f'{wasbs_path}SSC_2016_AUST.csv', header=True, inferSchema=True)

state_dict = {
    'New South Wales': 'NSW',
    'Victoria': 'VIC',
    'Queensland': 'QLD',
    'South Australia': 'SA',
    'Western Australia': 'WA',
    'Tasmania': 'TAS',
    'Northern Territory': 'NT',
    'Australian Capital Territory': 'ACT',
    'Other Territories': 'OT'
}

map_expr = create_map([lit(i) for i in chain(*state_dict.items())])

col_renamed_ssc_df = ssc_df \
    .withColumnRenamed('SSC_CODE_2016', 'ssc') \
    .withColumnRenamed('SSC_NAME_2016', 'suburb_state') \
    .withColumnRenamed('STATE_NAME_2016', 'state_fullname')

select_distinct_sort_ssc_df = col_renamed_ssc_df \
    .select('ssc', 'suburb_state', 'state_fullname') \
    .distinct() \
    .sort('ssc')

transformed_ssc_df = select_distinct_sort_ssc_df \
    .withColumn('suburb', regexp_replace('suburb_state', r'\s\(([^\)]+)\)', '')) \
    .withColumn('state', map_expr[col('state_fullname')])

final_ssc_df = transformed_ssc_df \
    .select('ssc', 'suburb', 'state') \
    .where(~col('suburb').like('% - %'))

final_ssc_df.write.saveAsTable('ssc', mode='overwrite')

display(final_ssc_df)
print((final_ssc_df.count(), len(final_ssc_df.columns)))


In [None]:
# G02: Selected Medians and Averages
# File:
# 2016Census_G02_AUS_SSC.csv

g02_df = spark.read.csv(f'{wasbs_path}2016Census_G02_AUS_SSC.csv', header=True, inferSchema=True)

transformed_g02_df = g02_df.withColumn('ssc', expr('substring(SSC_CODE_2016, 4, length(SSC_CODE_2016))'))

joined_g02_ssc_df = transformed_g02_df.join(final_ssc_df, on='ssc')

selected_columns = special_columns.copy()
selected_columns.extend(joined_g02_ssc_df.columns[2:-2])

final_g02_df = joined_g02_ssc_df \
    .select(selected_columns) \
    .distinct() \
    .sort('ssc')

final_g02_df.write.saveAsTable('g02', mode='overwrite')

display(final_g02_df)
print((final_g02_df.count(), len(final_g02_df.columns)))


In [None]:
# G17: Total Personal Income (Weekly) by Age by Sex for Persons aged 15 years and over
# Files:
# 2016Census_G17A_AUS_SSC.csv
# 2016Census_G17B_AUS_SSC.csv
# 2016Census_G17C_AUS_SSC.csv

g17_dict = {}

for i in ['a', 'b', 'c']:
    g17_dict[f'g17{i}_df'] = spark.read.csv(f'{wasbs_path}2016Census_G17{i.upper()}_AUS_SSC.csv', header=True, inferSchema=True)

combined_g17_df = g17_dict['g17a_df'] \
    .join(g17_dict['g17b_df'], on='SSC_CODE_2016') \
    .join(g17_dict['g17c_df'], on='SSC_CODE_2016')
    
prepared_g17_df = combined_g17_df \
    .withColumn('ssc', expr('substring(SSC_CODE_2016, 4, length(SSC_CODE_2016))')) \
    .withColumn('P_Neg_Nil_income_15_19_yrs', col('P_Neg_Nil_income_15_19_yrs').cast('long'))

joined_g17_ssc_df = prepared_g17_df.join(final_ssc_df, on='ssc')

grouped_g17_ssc_df = joined_g17_ssc_df \
    .groupBy(special_columns) \
    .sum()

selected_columns = special_columns.copy()
included_columns = [i for i in grouped_g17_ssc_df.columns[2:-2] if '_Tot' not in i and 'P_' in i]
selected_columns.extend([col(i).alias(i[4:-1]) for i in included_columns if 'sum(' in i])

selected_g17_df = grouped_g17_ssc_df \
    .select(selected_columns) \
    .distinct() \
    .sort(special_columns)

# transformed_g17_df = col_to_row(selected_g17_df, special_columns)

# final_g17_df = transformed_g17_df

# # final_g17_df.write.saveAsTable('g17', mode='overwrite')

# # df.write.partitionBy("hour").saveAsTable("myparquet")

display(final_g17_df)
print((final_g17_df.count(), len(final_g17_df.columns)))


In [None]:
# G28: Total Family Income (Weekly) by Family Composition
# File:
# 2016Census_G28_AUS_SSC.csv

g28_df = spark.read.csv(f'{wasbs_path}2016Census_G28_AUS_SSC.csv', header=True, inferSchema=True)

transformed_g28_df = g28_df.withColumn('ssc', expr('substring(SSC_CODE_2016, 4, length(SSC_CODE_2016))'))

joined_g28_ssc_df = transformed_g28_df.join(final_ssc_df, on='ssc')

selected_columns = special_columns.copy()
selected_columns.extend(joined_g28_ssc_df.columns[2:-2])

final_g28_df = joined_g28_ssc_df \
    .select(selected_columns) \
    .distinct() \
    .sort('ssc')

final_g28_df.write.saveAsTable('g28', mode='overwrite')

display(final_g28_df)
print((final_g28_df.count(), len(final_g28_df.columns)))


In [None]:
# G29: Total Household Income (Weekly) by Household Composition
# File:
# 2016Census_G29_AUS_SSC.csv

g29_df = spark.read.csv(f'{wasbs_path}2016Census_G29_AUS_SSC.csv', header=True, inferSchema=True)

transformed_g29_df = g29_df.withColumn('ssc', expr('substring(SSC_CODE_2016, 4, length(SSC_CODE_2016))'))

joined_g29_ssc_df = transformed_g29_df.join(final_ssc_df, on='ssc')

selected_columns = special_columns.copy()
selected_columns.extend(joined_g29_ssc_df.columns[2:-2])

final_g29_df = joined_g29_ssc_df \
    .select(selected_columns) \
    .distinct() \
    .sort('ssc')

final_g29_df.write.saveAsTable('g29', mode='overwrite')

display(final_g29_df)
print((final_g29_df.count(), len(final_g29_df.columns)))


In [None]:
cosmos_endpoint = os.getenv('COSMOS_ENDPOINT')
cosmos_account_key = os.getenv('COSMOS_ACCOUNT_KEY')
cosmos_database_name = os.getenv('COSMOS_DATABASE_NAME')
cosmos_container_name = os.getenv('COSMOS_CONTAINER_NAME')

read_conf = {
    'spark.cosmos.accountEndpoint': cosmos_endpoint,
    'spark.cosmos.accountKey': cosmos_account_key,
    'spark.cosmos.database': cosmos_database_name,
    'spark.cosmos.container': cosmos_container_name,
    'spark.cosmos.read.inferSchema.enabled': 'true',
    'spark.cosmos.read.inferSchema.includeTimestamp': 'true'
}

cosmos_suburbs_df = spark.read.format('cosmos.oltp').options(**read_conf).load()

window = Window().partitionBy('suburb').orderBy(col('_ts').desc())

transformed_cosmos_suburbs_df = cosmos_suburbs_df \
    .withColumn('row_number', row_number().over(window)) \
    .where(col('row_number') == 1)

final_cosmos_suburbs_df = transformed_cosmos_suburbs_df \
    .select('suburb', col('_ts').alias('timestamp')) \
    .sort('_ts')

final_cosmos_suburbs_df.write.saveAsTable('cosmos_suburbs', mode='overwrite')

display(final_cosmos_suburbs_df)
print((final_cosmos_suburbs_df.count(), len(final_cosmos_suburbs_df.columns)))
