### Analyse Transaction by SA2
1. Merge SA2 to transaction by postcode
2. Check how many unique SA2
3. Check for null SA2 values
4. Per SA2 aggregate: total_population, median age, the transaction average dollar amount (AOV), transaction frequency, and the number of unique customers, use BPNL % (num_unique_cust/total_population)
5. groupby month/weeknum over transaction freq/gmv/profit, visualize

In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import geopandas as gpd
from datetime import datetime
import matplotlib.pyplot as plt
%matplotlib inline

import geopandas as gpd
import folium
from pyspark.sql import SparkSession, Window, functions as F
from pyspark.sql.functions import countDistinct, col, date_format
import numpy as np
import pyspark.sql.functions as func
from pyspark.sql.types import (
    StringType,
    LongType,
    DoubleType,
    StructField,
    StructType,
    FloatType
)

import warnings
warnings.filterwarnings("ignore")

In [34]:
# Start Spark Session
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder.appName("MAST30034 Project 2 BNPL")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/patrick/opt/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/patrick/opt/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 54] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/patrick/opt/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/patrick/opt/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Py4JError: SparkSession$ does not exist in the JVM

In [3]:
# load BNPL dataset
consumer = spark.read.csv("../data/tables/tbl_consumer.csv", header=True, sep="|")
details = spark.read.parquet("../data/tables/consumer_user_details.parquet")
merchants = spark.read.parquet("../data/tables/tbl_merchants.parquet")

In [4]:
# load all transactions datasets
paths=['../data/tables/transactions_20210228_20210827_snapshot',
       '../data/tables/transactions_20210828_20220227_snapshot']

first = 1
for path in paths:
    if first:
        transactions = spark.read.parquet(path)
        print(f'added {path.split("/")[3]}')
        first = 0
    else:
        append_transactions = spark.read.parquet(path)
        transactions = transactions.union(append_transactions)
        print(f'added {path.split("/")[3]}')

                                                                                

added transactions_20210228_20210827_snapshot


                                                                                

added transactions_20210828_20220227_snapshot


In [5]:
# load poa_to_sa2 dataset
poa_to_sa2 = spark.read.csv("../data/curated/poa_w_sa2.csv", header=True)

In [6]:
transactions.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)



In [7]:
transactions.agg({'order_datetime': 'max'}).show()
transactions.agg({'order_datetime': 'min'}).show()

                                                                                

+-------------------+
|max(order_datetime)|
+-------------------+
|         2022-02-27|
+-------------------+

+-------------------+
|min(order_datetime)|
+-------------------+
|         2021-02-28|
+-------------------+



                                                                                

In [8]:
# rename columns
merchants = merchants.withColumnRenamed('name', 'merchant_name')
consumer = consumer.withColumnRenamed('name', 'consumer_name')


---
#### 1. Merge SA2 to transaction by postcode

In [9]:
# Join consumers with their respective details
consumer_detail = consumer.join(details, on="consumer_id")

# Join consumers with their respective transactions
consumer_trx = consumer_detail.join(transactions, on="user_id")

# Join transactions with the respective merchants
df_trx = consumer_trx.join(merchants, on="merchant_abn")

In [10]:
df_trx.show(5, vertical=True)



-RECORD 0------------------------------
 merchant_abn   | 33064796871          
 user_id        | 7                    
 consumer_id    | 511685               
 consumer_name  | Andrea Jones         
 address        | 122 Brandon Cliff    
 state          | QLD                  
 postcode       | 4606                 
 gender         | Female               
 dollar_value   | 373.0873675184212    
 order_id       | fe188788-b89f-4dd... 
 order_datetime | 2021-08-20           
 merchant_name  | Curabitur Massa C... 
 tags           | ((computer progra... 
-RECORD 1------------------------------
 merchant_abn   | 68435002949          
 user_id        | 7                    
 consumer_id    | 511685               
 consumer_name  | Andrea Jones         
 address        | 122 Brandon Cliff    
 state          | QLD                  
 postcode       | 4606                 
 gender         | Female               
 dollar_value   | 232.5364986739752    
 order_id       | b4a89891-a113-45e... 


                                                                                

In [11]:
df_trx.printSchema()

root
 |-- merchant_abn: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- consumer_id: string (nullable = true)
 |-- consumer_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- merchant_name: string (nullable = true)
 |-- tags: string (nullable = true)



In [12]:
poa_to_sa2.show(5, vertical=True)

-RECORD 0---------------------------------
 poa_code_2016     | 800                  
 poa_name_2016     | 0800                 
 sa2_maincode_2016 | 701011002.0          
 sa2_name_2016     | Darwin City          
 geometry          | POLYGON ((130.834... 
-RECORD 1---------------------------------
 poa_code_2016     | 810                  
 poa_name_2016     | 0810                 
 sa2_maincode_2016 | 701021013.0          
 sa2_name_2016     | Brinkin - Nakara     
 geometry          | POLYGON ((130.863... 
-RECORD 2---------------------------------
 poa_code_2016     | 812                  
 poa_name_2016     | 0812                 
 sa2_maincode_2016 | 701021014.0          
 sa2_name_2016     | Buffalo Creek        
 geometry          | POLYGON ((130.901... 
-RECORD 3---------------------------------
 poa_code_2016     | 815                  
 poa_name_2016     | 0815                 
 sa2_maincode_2016 | 701021013.0          
 sa2_name_2016     | Brinkin - Nakara     
 geometry  

In [13]:
# translate postcodes in transaction to sa2 codes
sa2_cols = ['poa_name_2016', 'sa2_maincode_2016', 'sa2_name_2016', 'geometry']
df_trx_sa2 = (df_trx \
                .join(poa_to_sa2[sa2_cols], 
                     on=[df_trx['postcode'] == poa_to_sa2['poa_name_2016']],
                     how='inner')
                .drop('poa_name_2016')
             )

---
#### 2. Check how many unique SA2

In [14]:
df_trx_sa2.select('sa2_maincode_2016').distinct().count()

                                                                                

1314

In [15]:
df_trx_sa2.printSchema()

root
 |-- merchant_abn: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- consumer_id: string (nullable = true)
 |-- consumer_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- merchant_name: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- sa2_maincode_2016: string (nullable = true)
 |-- sa2_name_2016: string (nullable = true)
 |-- geometry: string (nullable = true)



---
#### 3. Check for null in SA2 values

In [16]:
dict_null = {col:df_trx_sa2.filter(df_trx_sa2[col].isNull()).count() 
             for col in df_trx_sa2.columns}
dict_null

                                                                                

{'merchant_abn': 0,
 'user_id': 0,
 'consumer_id': 0,
 'consumer_name': 0,
 'address': 0,
 'state': 0,
 'postcode': 0,
 'gender': 0,
 'dollar_value': 0,
 'order_id': 0,
 'order_datetime': 0,
 'merchant_name': 0,
 'tags': 0,
 'sa2_maincode_2016': 0,
 'sa2_name_2016': 0,
 'geometry': 12090}

### 4. Analyse by State (monthly):
- by Total Dollar Value, 
- Active Merchants, 
- Active Consumers,
- AOV (overall, male vs female), 
- BNPL user % (num_unique_cust/total_population_over_18)


In [17]:
df_trx_sa2.show(5, vertical=True)



-RECORD 0---------------------------------
 merchant_abn      | 23661821077          
 user_id           | 13882                
 consumer_id       | 151968               
 consumer_name     | Scott Dean           
 address           | 09010 Brandi Prairie 
 state             | NSW                  
 postcode          | 2016                 
 gender            | Male                 
 dollar_value      | 51.527409870273424   
 order_id          | f20fdc13-9500-483... 
 order_datetime    | 2021-08-19           
 merchant_name     | Suspendisse Eleif... 
 tags              | ((computer progra... 
 sa2_maincode_2016 | 117031335.0          
 sa2_name_2016     | Redfern - Chippen... 
 geometry          | POLYGON ((151.196... 
-RECORD 1---------------------------------
 merchant_abn      | 88202878932          
 user_id           | 13882                
 consumer_id       | 151968               
 consumer_name     | Scott Dean           
 address           | 09010 Brandi Prairie 
 state     

                                                                                

In [18]:
df_trx_sa2 = df_trx_sa2.withColumn("order_month", 
                     date_format(col("order_datetime"), "M").cast('INT'))

df_trx_sa2 = df_trx_sa2.withColumn("order_year", 
                     date_format(col("order_datetime"), "y").cast('INT'))


In [19]:
state_trx = (df_trx_sa2.groupby(['state', 'order_year', 'order_month'])
             .agg({'order_id':'count', 'dollar_value':'sum'})
             .sort(['state', 'order_year', 'order_month']))
unique_cons = (df_trx_sa2.groupby(['state', 'order_year', 'order_month'])
               .agg(countDistinct('consumer_id'))
               .sort(['state', 'order_year', 'order_month']))
unique_merc = (df_trx_sa2.groupby(['state', 'order_year', 'order_month'])
               .agg(countDistinct('merchant_abn'))
               .sort(['state', 'order_year', 'order_month']))

def join_agg(sdf1, sdf2):
    '''
        take two dataframes and join the two dataframes
    '''
    sdf1 = (sdf1.alias("a") \
               .join(sdf2, 
                     on=['state', 'order_year', 'order_month'], 
                     how='inner')
           )
    return sdf1
state_trx = join_agg(state_trx, unique_cons)
state_trx = join_agg(state_trx, unique_merc)
    
# renaming a few columns
field_name_change = {"sum(dollar_value)": "total_dollar_value", 
                     "count(order_id)": "transaction_freq",
                     "count(consumer_id)": "n_unique_consumer",
                     "count(merchant_abn)": "n_unique_merchant"}
for old, new in field_name_change.items():
    state_trx = state_trx.withColumnRenamed(old, new)

cols = ['state', 'order_year', 'order_month', 'n_unique_consumer', 
        'transaction_freq', 'total_dollar_value', 'n_unique_merchant']
state_trx = state_trx[cols].sort(['state', 'order_year', 'order_month'])

state_trx = (state_trx.
             withColumn('avg_sales_per_consumer', 
                        col("total_dollar_value") / col("n_unique_consumer")))

state_trx = (state_trx.
             withColumn('avg_sales_per_merchant', 
                        col("total_dollar_value") / col("n_unique_merchant")))

In [20]:
state_trx.write.csv('/Users/patrick/Downloads/state_trx.csv')

AnalysisException: path file:/Users/patrick/Downloads/state_trx.csv already exists.

In [21]:
consumer_trx = (df_trx_sa2.groupby(['consumer_id', 'state', 'postcode', 
                                    'sa2_maincode_2016'])
                 .agg({'order_id':'count', 'dollar_value':'sum'})
                 .sort(['consumer_id']))

consumer_trx = (consumer_trx.
             withColumn('aov_consumer', 
                        col("sum(dollar_value)") / col("count(order_id)")))

In [22]:
# average consumer order value per transaction by state
consumer_trx.groupby('state').agg({'aov_consumer':'mean'})

                                                                                

state,avg(aov_consumer)
ACT,158.7767030922387
SA,158.38187416774943
TAS,159.05346106470913
WA,158.93517295210245
QLD,158.97512088335768
VIC,158.1636956303271
NSW,158.57322324864256


In [23]:
consumer_trx

                                                                                

consumer_id,state,postcode,sa2_maincode_2016,sum(dollar_value),count(order_id),aov_consumer
1000031,NSW,2177,127021509.0,59309.76783726944,327,181.3754368112215
1000051,QLD,4053,302011025.0,53789.85710050044,323,166.53206532662676
1000067,WA,6623,511041291.0,41416.99769537791,311,133.17362603015405
1000092,QLD,4850,318011463.0,56844.87832772903,351,161.9512203069203
1000115,WA,6030,505031101.0,53907.42499057463,354,152.2808615552956
1000286,SA,5280,407021152.0,49633.87304948021,330,150.4056759075158
1000293,VIC,3242,217031473.0,56934.372452776566,343,165.98942406057307
100039,NSW,2118,125021476.0,70589.49555532858,338,208.8446614062976
1000463,WA,6082,504031060.0,47737.10668715376,311,153.4955198943851
1000519,VIC,3550,202011020.0,64012.22817865185,332,192.8079162007586


In [24]:
poa_to_sa2.printSchema()

root
 |-- poa_code_2016: string (nullable = true)
 |-- poa_name_2016: string (nullable = true)
 |-- sa2_maincode_2016: string (nullable = true)
 |-- sa2_name_2016: string (nullable = true)
 |-- geometry: string (nullable = true)



---
Analyse income

In [25]:
sa2_income = gpd.read_file(f'../data/abs/sa2_income.gml')

In [26]:
sa2_income.info()

<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 2288 entries, 0 to 2287
Data columns (total 22 columns):
 #   Column                          Non-Null Count  Dtype   
---  ------                          --------------  -----   
 0   gml_id                          2288 non-null   object  
 1   fid                             2288 non-null   int64   
 2   sa2_code                        2288 non-null   int64   
 3   sa2_name                        2288 non-null   object  
 4   earners_persons                 2241 non-null   float64 
 5   median_age_of_earners_years     2241 non-null   float64 
 6   sum_aud                         2241 non-null   float64 
 7   median_aud                      2241 non-null   float64 
 8   mean_aud                        2241 non-null   float64 
 9   lowest_quartile_pc              2196 non-null   float64 
 10  second_quartile_pc              2196 non-null   float64 
 11  third_quartile_pc               2196 non-null   float64 
 12  highest_quar

In [27]:
sa2_income['sa2_code'] = sa2_income['sa2_code'].astype('float')

In [28]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")
sdf_income=spark.createDataFrame(sa2_income) 
sdf_income.printSchema()

root
 |-- gml_id: string (nullable = true)
 |-- fid: long (nullable = true)
 |-- sa2_code: double (nullable = true)
 |-- sa2_name: string (nullable = true)
 |-- earners_persons: double (nullable = true)
 |-- median_age_of_earners_years: double (nullable = true)
 |-- sum_aud: double (nullable = true)
 |-- median_aud: double (nullable = true)
 |-- mean_aud: double (nullable = true)
 |-- lowest_quartile_pc: double (nullable = true)
 |-- second_quartile_pc: double (nullable = true)
 |-- third_quartile_pc: double (nullable = true)
 |-- highest_quartile_pc: double (nullable = true)
 |-- percentile_ratos_p80_p20_ratio: double (nullable = true)
 |-- percentile_ratos_p80_p50_ratio: double (nullable = true)
 |-- percentile_ratos_p20_p50_ratio: double (nullable = true)
 |-- percentile_ratos_p10_p50_ratio: double (nullable = true)
 |-- gini_coefficient_coef: double (nullable = true)
 |-- income_share_top_1pc_pc: double (nullable = true)
 |-- income_share_top_5pc_pc: double (nullable = true)
 |-- i

In [29]:
sdf_income = sdf_income.withColumn(
                'sa2_code',
                F.col('sa2_code').cast('float')
             )

In [30]:
# trx_income = df_trx_sa2.join(sa2_income[['sa2_code','median_aud',
#                                           'median_age_of_earners_years',
#                                           'gini_coefficient_coef',
#                                           'earners_persons']], 
#                               how='left', left_on='sa2_maincode_2016', 
#                               right_on='sa2_code')
trx_income = (df_trx_sa2 \
               .join(sdf_income[['sa2_code','median_aud',
                                 'median_age_of_earners_years',
                                 'gini_coefficient_coef',
                                 'earners_persons']], 
                     on=[df_trx_sa2['sa2_maincode_2016'] == sdf_income['sa2_code']], 
                     how='left')
           )

In [31]:
dict_null = {col:trx_income.filter(trx_income[col].isNull()).count() 
             for col in trx_income.columns}
dict_null

                                                                                

{'merchant_abn': 0,
 'user_id': 0,
 'consumer_id': 0,
 'consumer_name': 0,
 'address': 0,
 'state': 0,
 'postcode': 0,
 'gender': 0,
 'dollar_value': 0,
 'order_id': 0,
 'order_datetime': 0,
 'merchant_name': 0,
 'tags': 0,
 'sa2_maincode_2016': 0,
 'sa2_name_2016': 0,
 'geometry': 16018,
 'order_month': 0,
 'order_year': 0,
 'sa2_code': 18225,
 'median_aud': 18225,
 'median_age_of_earners_years': 18225,
 'gini_coefficient_coef': 18225,
 'earners_persons': 18225}

In [32]:
trx_income.count()

                                                                                

38965403

In [33]:
SAMPLE_SIZE = 0.10
df = trx_income.sample(SAMPLE_SIZE, seed=0).toPandas()

[Stage 557:>                                                       (0 + 4) / 18]

22/09/08 10:57:05 ERROR Executor: Exception in task 1.0 in stage 557.0 (TID 1817)
java.lang.OutOfMemoryError: Cannot reserve 1073741824 bytes of direct buffer memory (allocated: 3921266348, limit: 4294967296)
	at java.base/java.nio.Bits.reserveMemory(Bits.java:178)
	at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:120)
	at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:330)
	at io.netty.buffer.UnpooledDirectByteBuf.allocateDirect(UnpooledDirectByteBuf.java:104)
	at io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledByteBufAllocator.java:215)
	at io.netty.buffer.UnpooledDirectByteBuf.<init>(UnpooledDirectByteBuf.java:64)
	at io.netty.buffer.UnpooledUnsafeDirectByteBuf.<init>(UnpooledUnsafeDirectByteBuf.java:41)
	at io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf.<init>(UnpooledByteBufAllocator.java:210)
	at io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(Unpool

Py4JJavaError: An error occurred while calling o423.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 557.0 failed 1 times, most recent failure: Lost task 1.0 in stage 557.0 (TID 1817) (100.94.176.202 executor driver): java.lang.OutOfMemoryError: Cannot reserve 1073741824 bytes of direct buffer memory (allocated: 3921266348, limit: 4294967296)
	at java.base/java.nio.Bits.reserveMemory(Bits.java:178)
	at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:120)
	at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:330)
	at io.netty.buffer.UnpooledDirectByteBuf.allocateDirect(UnpooledDirectByteBuf.java:104)
	at io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledByteBufAllocator.java:215)
	at io.netty.buffer.UnpooledDirectByteBuf.<init>(UnpooledDirectByteBuf.java:64)
	at io.netty.buffer.UnpooledUnsafeDirectByteBuf.<init>(UnpooledUnsafeDirectByteBuf.java:41)
	at io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf.<init>(UnpooledByteBufAllocator.java:210)
	at io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:91)
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:171)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:214)
	at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
	at org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:315)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:310)
	at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:298)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:276)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:240)
	at org.apache.arrow.vector.BaseVariableWidthVector.reallocDataBuffer(BaseVariableWidthVector.java:522)
	at org.apache.arrow.vector.BaseVariableWidthVector.handleSafe(BaseVariableWidthVector.java:1255)
	at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1091)
	at org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:251)
	at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:130)
	at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:95)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.$anonfun$next$1(ArrowConverters.scala:113)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:121)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:97)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.foreach(ArrowConverters.scala:97)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.to(ArrowConverters.scala:97)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toBuffer(ArrowConverters.scala:97)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toArray(ArrowConverters.scala:97)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3800)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2322)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$5(Dataset.scala:3798)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3802)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3779)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1(Dataset.scala:3779)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1$adapted(Dataset.scala:3778)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:139)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:141)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:136)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:113)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:107)
	at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$4(SocketAuthServer.scala:68)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:68)
Caused by: java.lang.OutOfMemoryError: Cannot reserve 1073741824 bytes of direct buffer memory (allocated: 3921266348, limit: 4294967296)
	at java.base/java.nio.Bits.reserveMemory(Bits.java:178)
	at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:120)
	at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:330)
	at io.netty.buffer.UnpooledDirectByteBuf.allocateDirect(UnpooledDirectByteBuf.java:104)
	at io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledByteBufAllocator.java:215)
	at io.netty.buffer.UnpooledDirectByteBuf.<init>(UnpooledDirectByteBuf.java:64)
	at io.netty.buffer.UnpooledUnsafeDirectByteBuf.<init>(UnpooledUnsafeDirectByteBuf.java:41)
	at io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf.<init>(UnpooledByteBufAllocator.java:210)
	at io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:91)
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:171)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:214)
	at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
	at org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:315)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:310)
	at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:298)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:276)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:240)
	at org.apache.arrow.vector.BaseVariableWidthVector.reallocDataBuffer(BaseVariableWidthVector.java:522)
	at org.apache.arrow.vector.BaseVariableWidthVector.handleSafe(BaseVariableWidthVector.java:1255)
	at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1091)
	at org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:251)
	at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:130)
	at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:95)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.$anonfun$next$1(ArrowConverters.scala:113)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:121)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:97)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.foreach(ArrowConverters.scala:97)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.to(ArrowConverters.scala:97)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toBuffer(ArrowConverters.scala:97)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toArray(ArrowConverters.scala:97)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3800)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2322)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 50634)
Traceback (most recent call last):
  File "/Users/patrick/opt/anaconda3/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/patrick/opt/anaconda3/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/Users/patrick/opt/anaconda3/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Users/patrick/opt/anaconda3/lib/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/Users/patrick/opt/anaconda3/lib/python3.9/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/Users/patrick/opt/anaconda3/lib/python3.9/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/Users/patrick

In [None]:
trx_income.groupby('merchant_abn')

---
### 5. Geospatial Visualization

In [None]:
poa_to_sa2 = pd.read_csv("../data/curated/poa_w_sa2.csv")

In [None]:
poa_to_sa2.info()

In [None]:
sa2_trx = (df_trx_sa2.groupby(['sa2_maincode_2016', 'order_year', 'order_month'])
             .agg({'order_id':'count', 'dollar_value':'sum'})
             .sort(['sa2_maincode_2016', 'order_year', 'order_month']))
unique_cons = (df_trx_sa2.groupby(['sa2_maincode_2016', 'order_year', 'order_month'])
               .agg(countDistinct('consumer_id'))
               .sort(['sa2_maincode_2016', 'order_year', 'order_month']))
unique_merc = (df_trx_sa2.groupby(['sa2_maincode_2016', 'order_year', 'order_month'])
               .agg(countDistinct('merchant_abn'))
               .sort(['sa2_maincode_2016', 'order_year', 'order_month']))

def join_agg(sdf1, sdf2):
    '''
        take two dataframes and join the two dataframes
    '''
    sdf1 = (sdf1.alias("a") \
               .join(sdf2, 
                     on=['sa2_maincode_2016', 'order_year', 'order_month'], 
                     how='inner')
           )
    return sdf1
sa2_trx = join_agg(sa2_trx, unique_cons)
sa2_trx = join_agg(sa2_trx, unique_merc)
    
# renaming a few columns
field_name_change = {"sum(dollar_value)": "total_dollar_value", 
                     "count(order_id)": "transaction_freq",
                     "count(consumer_id)": "n_unique_consumer",
                     "count(merchant_abn)": "n_unique_merchant"}
for old, new in field_name_change.items():
    sa2_trx = sa2_trx.withColumnRenamed(old, new)

cols = ['sa2_maincode_2016', 'order_year', 'order_month', 'n_unique_consumer', 
        'transaction_freq', 'total_dollar_value', 'n_unique_merchant']
sa2_trx = sa2_trx[cols].sort(['sa2_maincode_2016', 'order_year', 'order_month'])

sa2_trx = (sa2_trx.
             withColumn('avg_sales_per_consumer', 
                        col("total_dollar_value") / col("n_unique_consumer")))

sa2_trx = (sa2_trx.
             withColumn('avg_sales_per_order', 
                        col("total_dollar_value") / col("transaction_freq")))

sa2_trx = (sa2_trx.
             withColumn('avg_sales_per_merchant', 
                        col("total_dollar_value") / col("n_unique_merchant")))


In [None]:
sa2_trx.show(5, vertical=True)

In [None]:
poa_to_sa2 = poa_to_sa2.dropna()

In [None]:
from shapely import wkt

poa_to_sa2['geometry'] = poa_to_sa2['geometry'].astype('str').apply(wkt.loads)
gdf = gpd.GeoDataFrame(poa_to_sa2, crs='epsg:4326')


In [None]:
gdf['geometry'] = gdf['geometry'].to_crs("+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs")

In [None]:
# create a JSON 
geoJSON = gdf[['sa2_maincode_2016', 'geometry']].drop_duplicates('sa2_maincode_2016').to_json()

# print the first 300 chars of the json
print(geoJSON[:300])

In [None]:
sa2_trx = sa2_trx.toPandas()

In [None]:
sa2_trx.info()

In [None]:
# visualize trip_count by do_location id
m = folium.Map(location=[-38.043995, 145.264296], tiles="Stamen Terrain", zoom_start=8)
sa2_trx_filter = sa2_trx.filter(F.col('state') == "VIC")

custom_scale = (sa2_trx_filter['total_dollar_value'].quantile((0,0.2,0.4,0.6,0.8,1))).tolist()
c = folium.Choropleth(
    geo_data=geoJSON, # geoJSON 
    name='choropleth', # name of plot
    data=sa2_trx, # data source
    columns=['sa2_maincode_2016','total_dollar_value'], # the columns required
    key_on='properties.sa2_maincode_2016', # this is from the geoJSON's properties
    fill_color='YlOrRd', # color scheme
    nan_fill_color='black',
    legend_name='Average Sales per Transaction'
)

c.add_to(m)

m