In [84]:
# REMOVE WHITESPACE FROM CSV BEFORE RUNNING THIS NOTEBOOK
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

from functools import reduce
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import *

# Create a dataframe for each csv
df_household = spark.read.option("header",True).csv("C:\\spark\\data\\400_households.csv")
df_products = spark.read.option("header",True).csv("C:\\spark\\data\\400_products.csv")
df_transactions = spark.read.option("header",True).csv("C:\\spark\\data\\400_transactions.csv")

In [85]:
# Examine schema
df_household.printSchema()
df_products.printSchema()
df_transactions.printSchema()

root
 |-- HSHD_NUM: string (nullable = true)
 |-- L: string (nullable = true)
 |-- AGE_RANGE: string (nullable = true)
 |-- MARITAL: string (nullable = true)
 |-- INCOME_RANGE: string (nullable = true)
 |-- HOMEOWNER: string (nullable = true)
 |-- HSHD_COMPOSITION: string (nullable = true)
 |-- HH_SIZE: string (nullable = true)
 |-- CHILDREN: string (nullable = true)

root
 |-- PRODUCT_NUM: string (nullable = true)
 |-- DEPARTMENT: string (nullable = true)
 |-- COMMODITY: string (nullable = true)
 |-- BRAND_TY: string (nullable = true)
 |-- NATURAL_ORGANIC_FLAG: string (nullable = true)

root
 |-- BASKET_NUM: string (nullable = true)
 |-- HSHD_NUM: string (nullable = true)
 |-- PURCHASE_: string (nullable = true)
 |-- PRODUCT_NUM: string (nullable = true)
 |-- SPEND: string (nullable = true)
 |-- UNITS: string (nullable = true)
 |-- STORE_R: string (nullable = true)
 |-- WEEK_NUM: string (nullable = true)
 |-- YEAR: string (nullable = true)



In [86]:
from pyspark.sql.types import *
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of attributes to convert
HOUSEHOLD_FEATURES  = ['HH_SIZE', 'CHILDREN']
TRANSACTION_FEATURES = ['SPEND', 'UNITS']
DATE_LIST = ['PURCHASE_']
# Convert the types to either integer, flot or date using the convertColumn()
df_household = convertColumn(df_household, HOUSEHOLD_FEATURES, IntegerType())

df_transactions = convertColumn(df_transactions, TRANSACTION_FEATURES, FloatType())
df_transactions = convertColumn(df_transactions, DATE_LIST, DateType())

df_household = df_household.join(df_products,df_household.PRODUCT_NUM == df_products.PRODUCT_NUM,"inner")

In [98]:
# Renaming columns to get rid of ambiguity when joining under one dataframe
df_transactions = df_transactions.withColumnRenamed("HSHD_NUM", "h")
df_products = df_products.withColumnRenamed("PRODUCT_NUM", "p")

# Join the tables under the houshold dataframe
df_household = df_household.join(df_transactions,df_household.HSHD_NUM == df_transactions.h,"inner")
df_household = df_household.join(df_products,df_household.PRODUCT_NUM == df_products.p,"inner")


In [99]:
# Create a dataframe to represent Organic products pruchased
df_organic = df_household.filter(col("NATURAL_ORGANIC_FLAG").like("Y"))

# Count the organic products purchased by each age range in a household
df_organic_age = df_organic.groupBy('AGE_RANGE', 'HSHD_NUM').agg({'SPEND' : 'count'})
# Get and average of the counts for each age range
df_avg_organic_trans_by_age = df_organic_age.groupBy('AGE_RANGE').agg({'count(SPEND)' : 'avg'})
# Sort by age range
df_avg_organic_trans_by_age.orderBy('AGE_RANGE').show()

+---------+------------------+
|AGE_RANGE| avg(count(SPEND))|
+---------+------------------+
|    19-24|             120.0|
|    25-34|            115.32|
|    35-44|159.09302325581396|
|    45-54| 82.94520547945206|
|    55-64| 58.21333333333333|
|    65-74|              84.0|
|      75+|46.733333333333334|
|     null|            68.125|
+---------+------------------+



In [100]:
# Repeated but with income range
df_organic_income = df_organic.groupBy('INCOME_RANGE', 'HSHD_NUM').agg({'SPEND' : 'count'})
df_avg_organic_trans_by_income = df_organic_income.groupBy('INCOME_RANGE').agg({'count(SPEND)' : 'avg'})
df_avg_organic_trans_by_income.orderBy('INCOME_RANGE').show()

+------------+------------------+
|INCOME_RANGE| avg(count(SPEND))|
+------------+------------------+
|    100-150K| 92.91304347826087|
|       150K+|             159.4|
|      35-49K| 48.75925925925926|
|      50-74K| 76.46666666666667|
|      75-99K| 138.5681818181818|
|    UNDER35K|51.333333333333336|
|        null|             69.22|
+------------+------------------+



In [101]:
# Get all products, regardless of organic status for age range attribute
df_all_age = df_household.groupBy('AGE_RANGE', 'HSHD_NUM').agg({'SPEND' : 'count'})
df_avg_all_trans_by_age = df_all_age.groupBy('AGE_RANGE').agg({'count(SPEND)' : 'avg'})
df_avg_all_trans_by_age.orderBy('AGE_RANGE').show()

+---------+------------------+
|AGE_RANGE| avg(count(SPEND))|
+---------+------------------+
|    19-24|           2654.75|
|    25-34|           2543.25|
|    35-44| 2954.176470588235|
|    45-54|2298.9302325581393|
|    55-64|2100.1529411764704|
|    65-74|2094.9811320754716|
|      75+|            2206.0|
|     null|2158.4918032786886|
+---------+------------------+



In [102]:
# Get all products, regardless of organic status for income range attribute
df_all_income = df_household.groupBy('INCOME_RANGE', 'HSHD_NUM').agg({'SPEND' : 'count'})
df_avg_all_trans_by_income = df_all_income.groupBy('INCOME_RANGE').agg({'count(SPEND)' : 'avg'})
df_avg_all_trans_by_income.orderBy('INCOME_RANGE').show()

+------------+------------------+
|INCOME_RANGE| avg(count(SPEND))|
+------------+------------------+
|    100-150K| 2036.851851851852|
|       150K+| 2462.470588235294|
|      35-49K|        2423.65625|
|      50-74K|2347.8690476190477|
|      75-99K| 2368.823529411765|
|    UNDER35K|            2402.5|
|        null|2105.5272727272727|
+------------+------------------+



In [112]:
# Additional queries examined to look for conclusions to draw

#Marital status
df_organic_marital = df_organic.groupBy('MARITAL', 'HSHD_NUM').agg({'SPEND' : 'count'})
df_avg_organic_trans_by_marital = df_organic_marital.groupBy('MARITAL').agg({'count(SPEND)' : 'avg'})
df_avg_organic_trans_by_marital.orderBy('MARITAL').show()

# By homeowner
df_organic_homeowner = df_organic.groupBy('HOMEOWNER', 'HSHD_NUM').agg({'SPEND' : 'count'})
df_avg_organic_trans_by_homeowner = df_organic_homeowner.groupBy('HOMEOWNER').agg({'count(SPEND)' : 'avg'})
df_avg_organic_trans_by_homeowner.orderBy('HOMEOWNER').show()

+-------+-----------------+
|MARITAL|avg(count(SPEND))|
+-------+-----------------+
|Married|          88.0875|
| Single|83.41044776119404|
|   null|75.98305084745763|
+-------+-----------------+

+---------+-----------------+
|HOMEOWNER|avg(count(SPEND))|
+---------+-----------------+
|Homeowner|       90.7578125|
|   Renter|57.75757575757576|
|     null|         72.09375|
+---------+-----------------+

