In [1]:
import geopandas as gpd
import folium
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


In [2]:
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "2g")
    .config("spark.executer.memory", "4g")
    .getOrCreate()
)

22/09/15 11:28:14 WARN Utils: Your hostname, Xiaotongs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.13.8.134 instead (on interface en0)
22/09/15 11:28:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/09/15 11:28:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/15 11:28:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
sf = gpd.read_file("../data/external/SA2_2021/SA2_2021_AUST_GDA2020.shp")
sf = sf[["SA2_CODE21", "SA2_NAME21", "geometry"]]
sf.rename(columns={"SA2_CODE21":"SA2_code", "SA2_NAME21": "SA2_name"}, inplace=True)
sf = sf.loc[sf.geometry != None]
sf["SA2_code"] = sf["SA2_code"].astype(int)
gdf = gpd.GeoDataFrame(sf)

# create a JSON 
geoJSON = gdf.drop_duplicates("SA2_code").to_json()

# derive zone centroids 
gdf['centroid'] = gdf['geometry'].apply(lambda x: (x.centroid.y, x.centroid.x))
gdf.head()

Unnamed: 0,SA2_code,SA2_name,geometry,centroid
0,101021007,Braidwood,"POLYGON ((149.58424 -35.44426, 149.58444 -35.4...","(-35.45506362754262, 149.79323458580757)"
1,101021008,Karabar,"POLYGON ((149.21899 -35.36738, 149.21800 -35.3...","(-35.37594104823316, 149.23280174411747)"
2,101021009,Queanbeyan,"POLYGON ((149.21326 -35.34325, 149.21619 -35.3...","(-35.35102995469765, 149.22546319520652)"
3,101021010,Queanbeyan - East,"POLYGON ((149.24034 -35.34781, 149.24024 -35.3...","(-35.35516024135807, 149.25241255258334)"
4,101021012,Queanbeyan West - Jerrabomberra,"POLYGON ((149.19572 -35.36126, 149.19970 -35.3...","(-35.377580426117056, 149.20284846752082)"


In [4]:
sdf_all = spark.read.parquet("../data/curated/full_data/")
sdf_all.count()

                                                                                

7817737

In [5]:
sdf_all.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- SA2_code: integer (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- consumer_id: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- mean_total_income: integer (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- revenue_level: string (nullable = true)
 |-- take_rate: double (nullable = true)



In [6]:
# this function plots a choropleth map for a given dataframe and legend name
def draw_map(df, columns, legend_name):
    map = folium.Map(location=[-30, 144], width=800, height=500, tiles="cartodbpositron", zoom_start=4)

    c = folium.Choropleth(
        geo_data=geoJSON, # geoJSON 
        name='choropleth', 
        data=df.reset_index(), # data source
        columns=columns, # the columns required
        key_on='properties.SA2_code', # this is from the geoJSON's properties
        fill_color='YlOrRd', # color scheme
        line_opacity=0.1,
        fill_opacity=0.7,
        legend_name=legend_name
    )
    c.add_to(map)
    return map

### Mean Total Income v.s. SA2 

In [7]:
# extract mean total income of each SA2 district
mean_income = sdf_all.select("SA2_code", "mean_total_income").distinct().dropna().toPandas()
mean_income["SA2_code"] = mean_income["SA2_code"].astype(int)
mean_income.head()

                                                                                

Unnamed: 0,SA2_code,mean_total_income
0,310021282,55940
1,405011113,50782
2,215011387,60456
3,124041468,62728
4,204011058,56206


In [8]:
# join the computed dataframe with geo dataframe
income_df = mean_income.merge(gdf[['SA2_code', 'SA2_name', 'geometry']], on='SA2_code')
income_df

Unnamed: 0,SA2_code,mean_total_income,SA2_name,geometry
0,310021282,55940,Rosewood,"POLYGON ((152.47997 -27.61784, 152.48045 -27.6..."
1,405011113,50782,Mallala,"POLYGON ((138.25959 -34.44326, 138.25959 -34.4..."
2,215011387,60456,Ararat Surrounds,"POLYGON ((142.42681 -37.45834, 142.42710 -37.4..."
3,124041468,62728,Yarramundi - Londonderry,"POLYGON ((150.67085 -33.64447, 150.66969 -33.6..."
4,204011058,56206,Nagambie,"POLYGON ((144.93194 -36.74791, 144.93206 -36.7..."
...,...,...,...,...
1256,801031113,52454,Canberra East,"POLYGON ((149.20739 -35.34524, 149.20717 -35.3..."
1257,128011531,76206,Sylvania - Taren Point,"POLYGON ((151.09069 -34.01082, 151.09068 -34.0..."
1258,123031447,63644,Picton - Tahmoor - Buxton,"POLYGON ((150.50758 -34.25908, 150.50120 -34.2..."
1259,316051437,64322,Sunshine Beach,"POLYGON ((153.10270 -26.40147, 153.10275 -26.4..."


In [9]:
income_df.loc[income_df["mean_total_income"]>200000]

Unnamed: 0,SA2_code,mean_total_income,SA2_name,geometry
398,503021037,276862,Kings Park (WA),"POLYGON ((115.83968 -31.96616, 115.83964 -31.9..."
1128,503011032,281099,Cottesloe,"POLYGON ((115.75101 -31.99292, 115.75101 -31.9..."


In [10]:
# plot the map of mean total income for each SA2 district
income_map = draw_map(income_df, ['SA2_code','mean_total_income'], "Mean Total Income ($)")
income_map.save('../plots/mean_income_vs_SA2_map.html')


### Number of Consumers v.s. SA2

In [11]:
num_consumer = sdf_all.select("SA2_code", "consumer_id").distinct().dropna().groupBy("SA2_code").count().toPandas()
num_consumer["SA2_code"] = num_consumer["SA2_code"].astype(int)

                                                                                

In [12]:
# join the computed dataframe with geo dataframe
consumer_df = num_consumer.merge(gdf[['SA2_code', 'SA2_name', 'geometry']], on='SA2_code')
consumer_df.head()

Unnamed: 0,SA2_code,count,SA2_name,geometry
0,121031410,8,St Ives,"POLYGON ((151.15101 -33.72502, 151.14961 -33.7..."
1,510031273,32,Roebourne,"MULTIPOLYGON (((115.43450 -21.32744, 115.43378..."
2,307011177,82,Roma Surrounds,"POLYGON ((147.44676 -26.11513, 147.45136 -26.0..."
3,211021261,18,Donvale - Park Orchards,"POLYGON ((145.17468 -37.78406, 145.17432 -37.7..."
4,115021298,18,Galston - Laughtondale,"MULTIPOLYGON (((151.15205 -33.52766, 151.15205..."


In [13]:
# plot the map of mean total income for each SA2 district
num_consumer_map = draw_map(consumer_df, ['SA2_code','count'], "Number of Consumers")
num_consumer_map.save('../plots/number_of_consumer_vs_SA2_map.html')

### Number of Transactions v.s. SA2

In [14]:
num_transaction = sdf_all.select("SA2_code", "order_id").distinct().dropna().groupBy("SA2_code").count().toPandas()
num_transaction["SA2_code"] = num_transaction["SA2_code"].astype(int)

# join the computed dataframe with geo dataframe
transaction_df = num_transaction.merge(gdf[['SA2_code', 'SA2_name', 'geometry']], on='SA2_code')
transaction_df.head()

                                                                                

Unnamed: 0,SA2_code,count,SA2_name,geometry
0,211021261,5639,Donvale - Park Orchards,"POLYGON ((145.17468 -37.78406, 145.17432 -37.7..."
1,307011177,26810,Roma Surrounds,"POLYGON ((147.44676 -26.11513, 147.45136 -26.0..."
2,510031273,10392,Roebourne,"MULTIPOLYGON (((115.43450 -21.32744, 115.43378..."
3,115021298,5716,Galston - Laughtondale,"MULTIPOLYGON (((151.15205 -33.52766, 151.15205..."
4,121031410,2560,St Ives,"POLYGON ((151.15101 -33.72502, 151.14961 -33.7..."


In [15]:
# plot the map of mean total income for each SA2 district
num_transaction_map = draw_map(transaction_df, ['SA2_code','count'], "Number of Transactions")
num_transaction_map.save('../plots/number_of_transactions_vs_SA2_map.html')

### Median Order Value v.s. SA2

In [16]:
median_dollar_value = sdf_all.select("SA2_code", "dollar_value").dropna().groupBy("SA2_code")\
            .agg(F.percentile_approx("dollar_value", 0.5)\
            .alias("median_dollar_value"))\
            .toPandas()
median_dollar_value["SA2_code"] = median_dollar_value["SA2_code"].astype(int)

# join the computed dataframe with geo dataframe
dollar_value_df = median_dollar_value.merge(gdf[['SA2_code', 'SA2_name', 'geometry']], on='SA2_code')
dollar_value_df.head()

                                                                                

Unnamed: 0,SA2_code,median_dollar_value,SA2_name,geometry
0,115021298,59.721401,Galston - Laughtondale,"MULTIPOLYGON (((151.15205 -33.52766, 151.15205..."
1,211021261,59.662805,Donvale - Park Orchards,"POLYGON ((145.17468 -37.78406, 145.17432 -37.7..."
2,307011177,61.053364,Roma Surrounds,"POLYGON ((147.44676 -26.11513, 147.45136 -26.0..."
3,510031273,61.243852,Roebourne,"MULTIPOLYGON (((115.43450 -21.32744, 115.43378..."
4,121031410,64.812563,St Ives,"POLYGON ((151.15101 -33.72502, 151.14961 -33.7..."


In [17]:
# plot the map of mean total income for each SA2 district
median_dollar_value_map = draw_map(dollar_value_df, ['SA2_code','median_dollar_value'], "Median Dollar Value ($)")
median_dollar_value_map.save('../plots/median_dollar_value_vs_SA2_map.html')