# Outlier Analysis

Questions:
* If you found NULL values after you joined the datasets, what did you do with them? How many were there?

After joining the datasets, there were 750332 null values in the SA2 dataset and 182 in the origninal dataset, these comprised of approximatley 2% of the total dataset. for null values in the SA2 dataset, the postcodes were used to determine which state the purchase came from. then averaging over all the data in that state the other columns were populated.

* Was there any missing data that shouldn’t be missing after joining to your external
dataset? If anything was missing, how much was there and what did you do about it?

there was only 182 rows of missing data once joined, due to the extremely low number and the fact that the whole purchase was null, they were removed.

* If you decided to omit outliers, what does the distribution look like prior and after?

due to only removed a few records from the data, the distribution does not change.



In [None]:
from pyspark.sql import SparkSession, functions as F, DataFrame
from pyspark.sql.functions import col

spark = (
    SparkSession.builder.appName("Customer_Analysis")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.executor.memory", "2g")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

In [None]:
data = spark.read.parquet('../data/curated/final_data.parquet')
data

In [None]:
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns])

## SA2_CODE_2021 / SA2_NAME_2021

In [None]:
from pyspark.sql.functions import col
data.filter(col("SA2_CODE_2021").isNull()).count()/data.count()

In [None]:
import pyspark.sql.functions as F
import pandas as pd
data = data.withColumn(
    'state',
    F.when(((col('postcode') >= 1000) & (col('postcode') <= 1999)) | ((col('postcode') >= 2000) & (col('postcode') <= 2599)) | ((col('postcode') >= 2619) & (col('postcode') < 2899)) | ((col('postcode') >= 2921) & (col('postcode') < 2999)), 'NSW')\
    .when(((col('postcode') >= 200) & (col('postcode') <= 299)) | ((col('postcode') >= 2600) & (col('postcode') <= 2618)) | ((col('postcode') >= 2900) & (col('postcode') < 2920)), 'ACT')\
    .when(((col('postcode') >= 3000) & (col('postcode') <= 3999)) | ((col('postcode') >= 8000) & (col('postcode') <= 8999)), 'VIC')\
    .when(((col('postcode') >= 4000) & (col('postcode') <= 4999)) | ((col('postcode') >= 9000) & (col('postcode') <= 9999)), 'QLD')\
    .when(((col('postcode') >= 5000) & (col('postcode') <= 5999)), 'SA')\
    .when(((col('postcode') >= 6000) & (col('postcode') <= 6999)), 'WA')\
    .when(((col('postcode') >= 7000) & (col('postcode') <= 7999)), 'TAS')\
    .otherwise('NT')
)
data = data.withColumn(
    'SA2_NAME_2021',
    F.when( (col('state') == 'VIC') & (col('SA2_NAME_2021').isNull()), 'Victoria')\
    .when( (col('state') == 'NSW') & (col('SA2_NAME_2021').isNull()), 'New South Wales')\
    .when( (col('state') == 'QLD') & (col('SA2_NAME_2021').isNull()), 'Queensland')\
    .when( (col('state') == 'NT') & (col('SA2_NAME_2021').isNull()), 'Northern Territory')\
    .when( (col('state') == 'WA') & (col('SA2_NAME_2021').isNull()), 'Western Austraia')\
    .when( (col('state') == 'SA') & (col('SA2_NAME_2021').isNull()), 'South Australia')\
    .when( (col('state') == 'TAS') & (col('SA2_NAME_2021').isNull()), 'Tasmania')\
    .when( (col('state') == 'ACT') & (col('SA2_NAME_2021').isNull()), 'Australian Capital Territory')\
    .otherwise(col('SA2_NAME_2021'))
)
data.limit(5)

In [None]:
data = data.withColumn("Median_tot_prsnl_inc_weekly", data.Median_tot_prsnl_inc_weekly.cast('float'))
data = data.withColumn("Median_rent_weekly", data.Median_rent_weekly.cast('float'))
data = data.withColumn("Median_mortgage_repay_monthly",data.Median_mortgage_repay_monthly.cast('float'))
data = data.withColumn("Median_age_persons",data.Median_age_persons.cast('float'))
data = data.withColumn("Median_tot_hhd_inc_weekly",data.Median_tot_hhd_inc_weekly.cast('float'))
data = data.withColumn("Average_household_size",data.Average_household_size.cast('float'))
data = data.withColumnRenamed("Completed Year 12", "Completed_Year_12")
data = data.withColumn("Completed_Year_12",data.Completed_Year_12.cast('float'))
data = data.withColumnRenamed("Did Not Attend School", "Did_Not_Attend_School")
data = data.withColumn("Did_Not_Attend_School",data.Did_Not_Attend_School.cast('float'))
data = data.withColumn("TOT_P_P",data.TOT_P_P.cast('float'))


In [None]:
columns = ["Median_tot_prsnl_inc_weekly", "Median_rent_weekly", "Median_mortgage_repay_monthly", "Median_age_persons", "Median_tot_hhd_inc_weekly",\
           "Average_household_size", "Completed_Year_12", "Did_Not_Attend_School", "TOT_P_P"]

for column in columns:
    data = data.withColumn(
        column,
        F.when( (col('SA2_NAME_2021') == 'Northern Territory'), data.groupBy('state').mean(column).collect()[0][1])\
        .when( (col('SA2_NAME_2021') == 'Australian Capital Territory'), data.groupBy('state').mean(column).collect()[1][1])\
        .when( (col('SA2_NAME_2021') == 'South Australia'), data.groupBy('state').mean(column).collect()[2][1])\
        .when( (col('SA2_NAME_2021') == 'Tasmania'), data.groupBy('state').mean(column).collect()[3][1])\
        .when( (col('SA2_NAME_2021') == 'Western Austraia'), data.groupBy('state').mean(column).collect()[4][1])\
        .when( (col('SA2_NAME_2021') == 'Queensland'), data.groupBy('state').mean(column).collect()[5][1])\
        .when( (col('SA2_NAME_2021') == 'Victoria'), data.groupBy('state').mean(column).collect()[6][1])\
        .when( (col('SA2_NAME_2021') == 'New South Wales'), data.groupBy('state').mean(column).collect()[7][1])\
        .otherwise(col(column))
    )
data

## Postcode
remove all istances in a null postcode as there is not way to determine the location of the purchase with the postcode.

In [None]:
data = data.na.drop(subset=["postcode"])

In [None]:
print(data.agg({'postcode': 'min'}))
print(data.agg({'postcode': 'max'}))

According to AusPost, all post between 1000-9999 are valid, thus we will keep all.
## ABN
All ABNs are 11 digits long, thus we will verify this:

In [None]:
print(data.agg({'merchant_abn': 'min'}))
print(data.agg({'merchant_abn': 'max'}))

## Dollar Value

In [None]:
print(data.agg({'dollar_value': 'min'}))
print(data.agg({'dollar_value': 'max'}))

the above minmum dollar value

In [None]:
data.where(col("dollar_value") <= 0.05).count()

All these purchases seem unreasonable due the the limited things you can buy for 5c, thus they will be removed from the dataset.

In [None]:
data  = data[data['dollar_value'] >= 0.05]

In [None]:
data.where(col("dollar_value") >= 50000).limit(10)

Al instances above $50,000 come from anitque shops or jewelry shops and thus is plausible for the individuals to be spending large amounts of money. 

## Order Date 

In [None]:
print(data.agg({'order_datetime': 'min'}))
print(data.agg({'order_datetime': 'max'}))

Based on the above all values are in 2021.

## Gender

In [None]:
from pyspark.sql.functions import count

data.groupBy('gender').count()

## Median personal income

In [None]:
print(data.agg({'Median_tot_prsnl_inc_weekly': 'min'}))
print(data.agg({'Median_tot_prsnl_inc_weekly': 'max'}))
data  = data.filter(data.Median_mortgage_repay_monthly <= '50000.0')

## Median rent weekly

In [None]:
print(data.agg({'Median_rent_weekly': 'min'}))
print(data.agg({'Median_rent_weekly': 'max'}))

## Median mortgage monthly

In [None]:
print(data.agg({'Median_mortgage_repay_monthly': 'min'}))
print(data.agg({'Median_mortgage_repay_monthly': 'max'}))

In [None]:
data  = data.filter(data.Median_mortgage_repay_monthly != '9999.0')

## Median age

In [None]:
print(data.agg({'Median_age_persons': 'min'}))
print(data.agg({'Median_age_persons': 'max'}))

## Median household income

In [None]:
print(data.agg({'Median_tot_hhd_inc_weekly': 'min'}))
print(data.agg({'Median_tot_hhd_inc_weekly': 'max'}))

## Average household size

In [None]:
print(data.agg({'Average_household_size': 'min'}))
print(data.agg({'Average_household_size': 'max'}))


In [None]:
data = data.filter(data.Average_household_size != '0.0')

## Completed Year 12

In [None]:
print(data.agg({'Completed_Year_12': 'min'}))
print(data.agg({'Completed_Year_12': 'max'}))

## Did_Not_Attend_School

In [None]:
print(data.agg({'Did_Not_Attend_School': 'min'}))
print(data.agg({'Did_Not_Attend_School': 'max'}))

## Total Populuation

In [None]:
print(data.agg({'TOT_P_P': 'min'}))
print(data.agg({'TOT_P_P': 'max'}))

no outliers found

In [30]:
data.write.mode('overwrite').parquet('../data/curated/outlier_data.parquet')

ERROR:root:KeyboardInterrupt while sending command.====>           (8 + 2) / 10]
Traceback (most recent call last):
  File "/Users/jackmelleuish/opt/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/jackmelleuish/opt/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/jackmelleuish/opt/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

                                                                                