In [2]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pprint import pprint

conf = pyspark.SparkConf()
spark = SparkSession.builder.config(conf=conf).getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 12:52:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [20]:
# DATA QUALITY NOTES 
# 4011 in barcode means item not found
# items_df.select(F.count('*'),F.sum(F.when(F.col('barcode')==4011, 1).otherwise(0))).show()

# the major problem with raw reciepts is rewardsReceiptItemList column
# contains an array of strucTypes that represents a line item
# need to create a new 'item' table 
### 1) explodes the line items so it can be structures
### 2) bridges the receipt table to the brands table using barcode

# compare different price columns (which is more accruate, what is the difference)

# compare item count to item
# compare spend to by item 

# get rid of everything with a test brand

# questions:
# what is partnerItemId
# what is test brand, what is brandCode
 # most recent (full?) month? (assuming we include pp)
# receipts scanned, asking for a count of reciepts?
# where exactly this data is being collected
# where are the different item prices coming from

In [3]:
brands_json = "data/brands.json"

df = spark.read.json(brands_json)
print(f'raw: {df.count()}')

brands_df = df.select(
    F.col("_id.$oid").alias("_id"),
    F.col("barcode"),
    F.col("brandCode"),
    F.col("category"),
    F.col("categoryCode"),
    F.col("cpg.$id.$oid").alias("cpgId"),
    F.col("name"),
    F.col("topBrand"),
)
# .withColumn('test', F.col('cpgId') == F.col('_id'))
cpg_df = df.select(
    F.col("cpg.$id.$oid").alias("_id"),
    F.col("cpg.$ref").alias("cpgRef"),).distinct()

# check ids are unique and non null by counting compared to count earlier
print(f'unique and nonnull id: {brands_df.select('_id').where(F.col('_id').isNotNull()).distinct().count()}')

# check barcodes are unique and non null by counting compared to count earlier
print(f'unique and nonnull barcode: {brands_df.select('barcode').where(F.col('barcode').isNotNull()).distinct().count()}')
brands_df.groupBy('barcode').agg(F.count('*').alias('ct')).orderBy(F.col('ct').desc()).show()
brands_df.where(F.col('barcode') =='511111204923' ).show()

# # system relies that barcode is unique (criteria to be used as FK in another table)
# assert brands_df.where(F.col('barcode').isNotNull()).distinct().count()/brands_df.count() == 1, 'col barcode must be unique'


# check ids are unique and non null by counting compared to count earlier
print(f'unique and nonnull id in cpg_df: {cpg_df.where(F.col('_id').isNotNull()).distinct().count()}')

brands_df.show(truncate = False)
cpg_df.show()

                                                                                

raw: 1167
unique and nonnull id: 1167
unique and nonnull barcode: 1160
+------------+---+
|     barcode| ct|
+------------+---+
|511111204923|  2|
|511111504788|  2|
|511111004790|  2|
|511111605058|  2|
|511111305125|  2|
|511111704140|  2|
|511111504139|  2|
|511111501985|  1|
|511111603641|  1|
|511111208532|  1|
|511111503255|  1|
|511111203605|  1|
|511111115083|  1|
|511111818304|  1|
|511111915638|  1|
|511111617419|  1|
|511111017943|  1|
|511111205500|  1|
|511111519713|  1|
|511111005377|  1|
+------------+---+
only showing top 20 rows

+--------------------+------------+----------+--------+------------+--------------------+---------+--------+
|                 _id|     barcode| brandCode|category|categoryCode|               cpgId|     name|topBrand|
+--------------------+------------+----------+--------+------------+--------------------+---------+--------+
|5c45f91b87ff3552f...|511111204923|0987654321| Grocery|        NULL|5c45f8b087ff3552f...|   Brand1|    true|
|5d6027f46d

In [4]:
# read in raw receipt data
receipts_json = "data/receipts.json"
raw_receipts_df = spark.read.json(receipts_json)
print(f'raw: {df.count()}')

# the major problem with raw reciepts is rewardsReceiptItemList column
# contains an array of strucTypes that represents a line item
# need to create a new 'item' table 
### 1) explodes the line items so it can be structures
### 2) bridges the receipt table to the brands table using barcode

receipts_df = raw_receipts_df.select(
    F.col("_id.$oid").alias("_id"),
    F.col("bonusPointsEarned"),
    F.col("bonusPointsEarnedReason"),
    F.from_unixtime(F.col("createDate.$date")/1000).alias('createDate'),
    F.from_unixtime(F.col("dateScanned.$date")/1000).alias('dateScanned'),
    F.from_unixtime(F.col("finishedDate.$date")/1000).alias('finishedDate'),
    F.from_unixtime(F.col("modifyDate.$date")/1000).alias('modifyDate'),
    F.from_unixtime(F.col("pointsAwardedDate.$date")/1000).alias('pointsAwardedDate'),
    F.col("pointsEarned"),
    F.from_unixtime(F.col("purchaseDate.$date")/1000).alias('purchaseDate'),
    F.col("purchasedItemCount"),
    F.col("rewardsReceiptStatus"),
    F.col("totalSpent"),
    F.col("userId"),
)

receipts_df.show()

items_df = raw_receipts_df.select(F.explode(F.col('rewardsReceiptItemList')).alias('item_struct'), F.col("_id.$oid").alias("receiptId"))
items_df.show()
total_item_count = items_df.count()
# Dynamically create new columns from each field in the 
cols = [col.name for col in items_df.schema['item_struct'].dataType.fields]
for col in cols:
    items_df = items_df.withColumn(f"{col}", F.col("item_struct." + col))
    nonnull_rate = items_df.where(F.col(col).isNotNull()).count()/total_item_count
    # removes some columns that are not as important (decided remove anything greater than 90% null)
    if nonnull_rate < .1:
        items_df = items_df.drop(col)
    
    print(f'{col}: {nonnull_rate}')
print(f'total: {total_item_count}')
# creates final items_df - add a new _id column 
id_w = Window.partitionBy('receiptId').orderBy(F.col('barcode'),F.col('description'))
items_df = items_df.select(
    F.concat(F.col('receiptId'),F.lit('_'),F.row_number().over(id_w)).alias('_id'),
    'receiptId',
    'barcode',
    'brandCode',
    'description',
    'discountedItemPrice',
    'finalPrice',
    'itemPrice',
    'metabriteCampaignId',
    'needsFetchReview',
    'originalReceiptItemText',
    'partnerItemId',
    'pointsEarned',
    'pointsPayerId',
    'priceAfterCoupon',
    'quantityPurchased',
    'rewardsGroup',
    'rewardsProductPartnerId',
)
items_df.where(F.col('brandCode').isNotNull()).show()

# check ids are unique and non null by counting compared to count earlier
print(f'unique and nonnull id: {items_df.where(F.col('_id').isNotNull()).distinct().count()}')


24/11/14 12:52:23 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


raw: 1167
+--------------------+-----------------+-----------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------+-------------------+------------------+--------------------+----------+--------------------+
|                 _id|bonusPointsEarned|bonusPointsEarnedReason|         createDate|        dateScanned|       finishedDate|         modifyDate|  pointsAwardedDate|pointsEarned|       purchaseDate|purchasedItemCount|rewardsReceiptStatus|totalSpent|              userId|
+--------------------+-----------------+-----------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------+-------------------+------------------+--------------------+----------+--------------------+
|5ff1e1eb0a720f052...|              500|   Receipt number 2 ...|2021-01-03 10:25:31|2021-01-03 10:25:31|2021-01-03 10:25:31|2021-01-03 10:25:36|2021-01-03 10:25:31|       500.0|202

In [5]:
items_df.select('partnerItemId').distinct().show()

+-------------+
|partnerItemId|
+-------------+
|         1090|
|         1159|
|         1572|
|         1436|
|         1512|
|         1372|
|         1669|
|         1394|
|          944|
|         1241|
|         1265|
|         1280|
|         1361|
|         1746|
|            7|
|         1953|
|         1528|
|         1897|
|         1445|
|         1695|
+-------------+
only showing top 20 rows



In [8]:
items_df.join(brands_df, 'barcode', 'left' ).where(F.col('name').isNotNull()).show()
# e5e63da79fcd2bebbd7cb8bf1c1d0274

+------------+--------------------+--------------------+------------+------------+-------------------+----------+---------+-------------------+----------------+-----------------------+-------------+------------+--------------------+----------------+-----------------+------------+-----------------------+--------------------+--------------------+------------+--------------------+------------+--------+
|     barcode|                 _id|           receiptId|   brandCode| description|discountedItemPrice|finalPrice|itemPrice|metabriteCampaignId|needsFetchReview|originalReceiptItemText|partnerItemId|pointsEarned|       pointsPayerId|priceAfterCoupon|quantityPurchased|rewardsGroup|rewardsProductPartnerId|                 _id|            category|categoryCode|               cpgId|        name|topBrand|
+------------+--------------------+--------------------+------------+------------+-------------------+----------+---------+-------------------+----------------+-----------------------+----------

In [15]:
items_df.join(brands_df, 'barcode', 'left' ).where(F.col('name').isNull()).show()

+------------+--------------------+--------------------+---------+--------------------+-------------------+----------+---------+-------------------+----------------+-----------------------+-------------+------------+--------------------+----------------+-----------------+--------------------+-----------------------+----+--------+------------+-----+----+--------+
|     barcode|                 _id|           receiptId|brandCode|         description|discountedItemPrice|finalPrice|itemPrice|metabriteCampaignId|needsFetchReview|originalReceiptItemText|partnerItemId|pointsEarned|       pointsPayerId|priceAfterCoupon|quantityPurchased|        rewardsGroup|rewardsProductPartnerId| _id|category|categoryCode|cpgId|name|topBrand|
+------------+--------------------+--------------------+---------+--------------------+-------------------+----------+---------+-------------------+----------------+-----------------------+-------------+------------+--------------------+----------------+----------------

+--------+--------------+----------+
|count(1)|count(barcode)|count(_id)|
+--------+--------------+----------+
|    6948|          3097|        89|
+--------+--------------+----------+



In [19]:
items_df.select(F.count('*'),F.sum(F.when(F.col('barcode')==4011, 1).otherwise(0))).show()

+--------+-------------------------------------------------+
|count(1)|sum(CASE WHEN (barcode = 4011) THEN 1 ELSE 0 END)|
+--------+-------------------------------------------------+
|    6941|                                              177|
+--------+-------------------------------------------------+



In [5]:
# There are dupes in users table

users_json = "data/users.json"

df = spark.read.json(users_json)

print(f'raw: {df.count()}')

users_df = df.select(
    F.col("_id.$oid").alias("_id"),
    F.col("active"),
    F.from_unixtime(F.col("createdDate.$date")/1000).alias('createdDate'),
    F.from_unixtime(F.col("lastLogin.$date")/1000).alias('lastLogin'),
    F.col("role"),
    F.col("signUpSource"),
    F.col("state"),
).distinct()

print(f'total: {users_df.count()}')
# check ids are unique and non null by counting compared to count earlier
print(f'unique and nonnull id: {users_df.where(F.col('_id').isNotNull()).distinct().count()}')
users_df.show(truncate = False)

raw: 495
total: 212
unique and nonnull id: 212
+------------------------+------+-------------------+-------------------+--------+------------+-----+
|_id                     |active|createdDate        |lastLogin          |role    |signUpSource|state|
+------------------------+------+-------------------+-------------------+--------+------------+-----+
|5ffc9001b3348b11c93388b6|true  |2021-01-11 12:50:57|2021-01-11 12:50:57|consumer|Email       |WI   |
|601465a884231211ce796db3|true  |2021-01-29 14:44:40|2021-01-29 14:44:40|consumer|Email       |WI   |
|6014558467804a1228b20cf0|true  |2021-01-29 13:35:48|2021-01-29 13:35:48|consumer|Email       |WI   |
|601ac195af4b1a1205f7560f|true  |2021-02-03 10:30:29|2021-02-03 10:34:11|consumer|Email       |WI   |
|5ffcb4bc04929111f6e92608|true  |2021-01-11 15:27:40|2021-01-11 15:27:40|consumer|Email       |WI   |
|601ac1da591789121574dc07|true  |2021-02-03 10:31:38|2021-02-03 10:31:38|consumer|Email       |WI   |
|5ff36be7135e7011bcb856d3|true  |20

In [14]:
receipts_df.join(users_df.alias('users_df'), F.col('userId') == F.col('users_df._id'), 'left' ).select(F.count('*'), F.count(users_df['_id'])).show()

+--------+----------+
|count(1)|count(_id)|
+--------+----------+
|    1119|       971|
+--------+----------+



In [None]:
# What are the top 5 brands by receipts scanned for most recent (full?) month?
# How does the ranking of the top 5 brands by receipts scanned for the recent month compare to the ranking for the previous month?

In [108]:
# requires some set up but we can answer both questions in a single model
# we need to join brands to items to reciepts
# then we need to group, get the total reciept count and finally get the rank by month
# then we can use a window to get last month by lag, and finally get the most recent month using a window as well
# cleaning up the column selection, adding limits and filtering for the most recent month 

# making an assumption we want to see the most recent month we have data for
# assuming we want ro rank by reciept count


# set up windows
ct = Window.partitionBy(F.col('month')).orderBy(F.col('receipt_ct').desc())
lag = Window.partitionBy(F.col('name')).orderBy(F.col('month'))
# do the joins (we will need these later)
items_with_brand = items_df.join(brands_df, 'barcode', 'left' )
receipt_with_brand = (
    items_with_brand
 .join(receipts_df.withColumn(
     'month',
     F.date_trunc('month', F.col('dateScanned'))), items_with_brand['receiptId'] == receipts_df['_id'], 'left')
     # filter out brands we cant join
    .where(brands_df['_id'].isNotNull())
)

(receipt_with_brand
 # group by month and name because we need the rct ct per month
 .groupBy('month', 'name')
 .agg(F.count_distinct('receiptId').alias('receipt_ct'))
 # rank the brands in each month
 .withColumn('recent_month_rank', F.row_number().over(ct))
 # get last months rank
 .withColumn('last_month_rank', F.lag(F.col('recent_month_rank')).over(lag))
 .withColumn('recent_month', F.max(F.col('month')).over(recent_month))
 .orderBy(F.col('month').desc(), F.col('recent_month_rank'))
 .where(F.col('month') == F.col('recent_month'))
 .select('name', 'recent_month_rank', 'last_month_rank')
 .limit(5)
).show()

+--------------------+-----------------+---------------+
|                name|recent_month_rank|last_month_rank|
+--------------------+-----------------+---------------+
|            Tostitos|                1|           NULL|
|             Swanson|                2|           NULL|
|Cracker Barrel Ch...|                3|           NULL|
|               Prego|                4|           NULL|
|     Diet Chris Cola|                5|           NULL|
+--------------------+-----------------+---------------+



In [80]:
# wrote it in SQL, in case this is prefered
items_df.createOrReplaceTempView("items")
brands_df.createOrReplaceTempView("brands")
receipts_df.createOrReplaceTempView("receipts")

spark.sql("""
WITH items_with_brand AS (
    SELECT 
    i.*, 
    b._id AS brand_id, 
    b.name 
    FROM items i
    LEFT JOIN brands b USING(barcode)
    WHERE b._id IS NOT NULL
),
receipts_with_month AS (
    SELECT 
    *, 
    DATE_TRUNC('month', dateScanned) AS month
    FROM receipts 
),
agg AS (
    SELECT
    r.month,
    i.name,
    COUNT(DISTINCT r._id) AS receipt_ct
    FROM items_with_brand i     
    LEFT JOIN receipts_with_month r ON i.receiptId = r._id
    GROUP BY r.month, i.name
),
ranked AS (
    SELECT 
    *,
    ROW_NUMBER() OVER (PARTITION BY month ORDER BY receipt_ct DESC) AS recent_month_rank,
    LAG(ROW_NUMBER() OVER (PARTITION BY name ORDER BY month)) OVER (PARTITION BY name ORDER BY month) AS last_month_rank,
    MAX(month) OVER (PARTITION BY '') AS recent_month
    FROM agg
)
SELECT 
name, 
recent_month_rank, 
last_month_rank
FROM ranked WHERE month = recent_month
ORDER BY month DESC, recent_month_rank
LIMIT 5
""").show()

+--------------------+-----------------+---------------+
|                name|recent_month_rank|last_month_rank|
+--------------------+-----------------+---------------+
|             Swanson|                1|           NULL|
|            Tostitos|                2|           NULL|
|Cracker Barrel Ch...|                3|           NULL|
|     Diet Chris Cola|                4|           NULL|
|               Prego|                5|           NULL|
+--------------------+-----------------+---------------+



In [None]:
# When considering average spend from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?
# When considering total number of items purchased from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?

In [72]:
# Assuming the totalSpend and purchasedItemCount columns are accurate, we dont need any joins to answer these questions

(receipts_df
 .groupBy('rewardsReceiptStatus')
 .agg(
     F.avg('totalSpent').alias('average_spend'),
     F.sum('purchasedItemCount').alias('total_number_of_items_purchased')
 )
 .where(F.col('rewardsReceiptStatus').isin(['FINISHED','REJECTED']))
).show()

+--------------------+------------------+-------------------------------+
|rewardsReceiptStatus|     average_spend|total_number_of_items_purchased|
+--------------------+------------------+-------------------------------+
|            FINISHED| 80.85430501930502|                           8184|
|            REJECTED|23.326056338028184|                            173|
+--------------------+------------------+-------------------------------+



In [79]:
# wrote it in SQL, in case this is prefered
spark.sql("""
SELECT 
rewardsReceiptStatus, 
AVG(totalSpent) average_spend, 
SUM(purchasedItemCount) total_number_of_items_purchased
FROM receipts
WHERE rewardsReceiptStatus IN ('FINISHED','REJECTED')
GROUP BY rewardsReceiptStatus
""").show()

+--------------------+------------------+-------------------------------+
|rewardsReceiptStatus|     average_spend|total_number_of_items_purchased|
+--------------------+------------------+-------------------------------+
|            FINISHED| 80.85430501930502|                           8184|
|            REJECTED|23.326056338028184|                            173|
+--------------------+------------------+-------------------------------+



In [None]:
# Which brand has the most spend among users who were created within the past 6 months?
# Which brand has the most transactions among users who were created within the past 6 months?

In [144]:
# This query requires me to identify and filter reciepts of users that joined in the last 6 month
# we can use the previously created df - receipt_with_brand
# need group by the brand, and compute spend and transactions
# optionally we can add a rank to identify the top spend and txn count brands

q = Window.partitionBy(F.lit('')).orderBy(F.col('transactions').desc())
p = Window.partitionBy(F.lit('')).orderBy(F.col('spend').desc())

last_six_month_users = (
    users_df.where(F.col('createdDate') >= F.date_sub(F.current_date(), 180)))

(receipt_with_brand
 .join(last_six_month_users, receipt_with_brand['userId'] == last_six_month_users['_id'], 'inner')
 .groupBy('name')
 .agg(F.sum(F.col('itemPrice')*F.col('quantityPurchased')).alias('spend'),
     F.sum(F.col('quantityPurchased')).alias('transactions'),
     )
 .withColumn('spend_rank', F.row_number().over(p))
 .withColumn('txns_rank', F.row_number().over(q))
 .where((F.col('spend_rank') == 1) | (F.col('txns_rank') == 1))
).show()

+----+-----+------------+----------+---------+
|name|spend|transactions|spend_rank|txns_rank|
+----+-----+------------+----------+---------+
+----+-----+------------+----------+---------+



In [None]:
#### problems with the data

In [6]:
from pyspark.sql import functions as F

def basic_checks(df):
    result_dict = {'total_count': df.count()}
    # check if nulls exist
    nulls = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns]).collect()[0].asDict()
    result_dict['nulls'] =nulls
    # uniqueness
    uniqueness= {}
    for col in df.columns:
        unique_count=df.select(col).distinct().count()
        total_count = df.count()
        uniqueness[col] = unique_count == total_count
    result_dict['uniqueness'] = uniqueness
    # check for duplicates
    duplicate_count = df.count() - df.distinct().count()
    result_dict['duplicates'] = duplicate_count
    # check datatypes
    data_types = {col: df.schema[col].dataType.simpleString() for col in df.columns}
    result_dict['data_types'] =data_types
    # summary states
    numeric_cols = [col for col, dtype in data_types.items() if 'int' in dtype or 'double' in dtype or 'float' in dtype]
    summary_stats= {}
    for col in numeric_cols:
        summary_stats[col] =df.select(col).describe().collect()
    result_dict['summary_stats'] =summary_stats
    return result_dict

In [12]:
# lets do some basic bottom-up check on the actual dataframe
pprint(basic_checks(items_df))

{'data_types': {'_id': 'string',
                'barcode': 'string',
                'brandCode': 'string',
                'description': 'string',
                'discountedItemPrice': 'string',
                'finalPrice': 'string',
                'itemPrice': 'string',
                'metabriteCampaignId': 'string',
                'needsFetchReview': 'boolean',
                'originalReceiptItemText': 'string',
                'partnerItemId': 'string',
                'pointsEarned': 'string',
                'pointsPayerId': 'string',
                'priceAfterCoupon': 'string',
                'quantityPurchased': 'bigint',
                'receiptId': 'string',
                'rewardsGroup': 'string',
                'rewardsProductPartnerId': 'string'},
 'duplicates': 0,
 'nulls': {'_id': 0,
           'barcode': 3851,
           'brandCode': 4341,
           'description': 381,
           'discountedItemPrice': 1172,
           'finalPrice': 174,
           'itemPri

In [127]:
# barcode join is really weak
items_df.join(brands_df, 'barcode', 'left' ).select(F.count('*'),F.count('barcode'), F.count(brands_df['_id'])).show()

+--------+--------------+----------+
|count(1)|count(barcode)|count(_id)|
+--------+--------------+----------+
|    6948|          3097|        89|
+--------+--------------+----------+



In [128]:
# there are some barcodes that say item not found, not too significant, but still imporant 
items_df.select(F.count('*'),F.sum(F.when(F.col('barcode')==4011, 1).otherwise(0))).show()

+--------+-------------------------------------------------+
|count(1)|sum(CASE WHEN (barcode = 4011) THEN 1 ELSE 0 END)|
+--------+-------------------------------------------------+
|    6941|                                              177|
+--------+-------------------------------------------------+



In [6]:
# make a QA df that checks item spend vs receipt
# in most cases, item and receipt are part of this 

item_vs_receipt = (items_df
 .groupBy('receiptId')
 .agg(
     F.sum(F.col('quantityPurchased')*F.col('finalPrice')).alias('spend_by_items'),
     F.sum(F.col('quantityPurchased')).alias('qty_by_items')
     )
 .join(receipts_df, items_df['receiptId'] == receipts_df['_id'], 'left'  )
 .select('receiptId', 'userId','spend_by_items', 'qty_by_items', F.col('totalSpent').cast('numeric'), F.col('purchasedItemCount').cast('numeric'))
)
item_vs_receipt.show()

+--------------------+--------------------+------------------+------------+----------+------------------+
|           receiptId|              userId|    spend_by_items|qty_by_items|totalSpent|purchasedItemCount|
+--------------------+--------------------+------------------+------------+----------+------------------+
|5ffe23d70a7214ad2...|59c124bae4b0299e5...|             28.57|           1|        29|              NULL|
|60099c3c0a7214ad8...|60099c1450b33111f...|3795.4400000000005|         341|      1083|               341|
|6010be3f0a7214ada...|6010bddaa4b74c120...|              2.23|           1|         2|                 1|
|603afc0e0a720fde1...|5fc961c3b8cfca11a...|             34.96|           2|        35|                 2|
|600fb21f0a7214ada...|600fb1ac73c60b120...|               1.0|           1|         1|                 1|
|60132aef0a720f05f...|60132acb73c60b3ca...|              10.0|           1|        10|                 1|
|60189c900a7214ad2...|60189c90c8b50e11d...|   

In [7]:
# the mape on the spend is extraordinary, ct_mape is good 

(item_vs_receipt
 .withColumn('spend_abs_error', (F.col('spend_by_items')-F.col('totalSpent'))/F.col('totalSpent'))
 .withColumn('ct_abs_error', (F.col('qty_by_items')-F.col('purchasedItemCount'))/F.col('purchasedItemCount'))
 .agg(F.avg('spend_abs_error').alias('spend_mape'), F.avg('ct_abs_error').alias('ct_mape'))
).show()

+------------------+-----------------+
|        spend_mape|          ct_mape|
+------------------+-----------------+
|0.6471062354543637|0.007083825265649|
+------------------+-----------------+



In [168]:
item_vs_receipt.orderBy(F.col('spend_by_items').desc()).limit(5).show()
item_vs_receipt.orderBy(F.col('qty_by_items').desc()).limit(5).show()
item_vs_receipt.orderBy(F.col('totalSpent').desc()).limit(5).show()
item_vs_receipt.orderBy(F.col('purchasedItemCount').desc()).limit(5).show()

+--------------------+--------------------+------------------+------------+----------+------------------+
|           receiptId|              userId|    spend_by_items|qty_by_items|totalSpent|purchasedItemCount|
+--------------------+--------------------+------------------+------------+----------+------------------+
|600f2fc80a720f053...|600f29a64329897ea...|20636.250000000036|         689|      4566|               689|
|600f39c30a7214ada...|600f35015edb78209...|20378.730000000047|         670|      4722|               670|
|600f24970a720f053...|600f20c15edb787dc...|19791.330000000013|         599|      4369|               599|
|600f0cc70a720f053...|600f00d05edb787dc...| 9126.879999999994|         303|      2085|               303|
|600996ac0a720f05f...|6009969150b33111f...|           4331.86|         348|      1199|               348|
+--------------------+--------------------+------------------+------------+----------+------------------+

+--------------------+--------------------+--

In [10]:
# check if there are large users

user_item_vs_receipt = item_vs_receipt.groupBy('userId').agg(
    F.countDistinct('receiptId').alias('receipt_ct'),
    F.avg('spend_by_items').alias('avg_spend_by_items'),
    F.sum('spend_by_items').alias('spend_by_items'),
    F.sum('qty_by_items').alias('qty_by_items'),
    F.sum('totalSpent').alias('totalSpent'),
    F.sum('purchasedItemCount').alias('purchasedItemCount')
)

user_item_vs_receipt.orderBy(F.col('receipt_ct').desc()).limit(5).show()
user_item_vs_receipt.orderBy(F.col('spend_by_items').desc()).limit(5).show()
user_item_vs_receipt.orderBy(F.col('qty_by_items').desc()).limit(5).show()
user_item_vs_receipt.orderBy(F.col('totalSpent').desc()).limit(5).show()
user_item_vs_receipt.orderBy(F.col('purchasedItemCount').desc()).limit(5).show()

+--------------------+----------+------------------+------------------+------------+----------+------------------+
|              userId|receipt_ct|avg_spend_by_items|    spend_by_items|qty_by_items|totalSpent|purchasedItemCount|
+--------------------+----------+------------------+------------------+------------+----------+------------------+
|54943462e4b07e684...|        50|               2.0|             100.0|          55|       100|                55|
|59c124bae4b0299e5...|        48|28.570000000000004|1371.3600000000001|          48|      1392|              NULL|
|5fc961c3b8cfca11a...|        42| 33.99928571428573|1427.9700000000007|          83|      1430|                83|
|5fa41775898c7a11a...|        21|              84.0|            1764.0|          84|      1764|                84|
|5ff5d15aeb7c7d120...|        20|23.707500000000003|474.15000000000003|          74|       447|                74|
+--------------------+----------+------------------+------------------+---------

In [122]:
# bottom-up check for receipts
pprint(basic_checks(receipts_df))

                                                                                

{'data_types': {'_id': 'string',
                'bonusPointsEarned': 'bigint',
                'bonusPointsEarnedReason': 'string',
                'createDate': 'string',
                'dateScanned': 'string',
                'finishedDate': 'string',
                'modifyDate': 'string',
                'pointsAwardedDate': 'string',
                'pointsEarned': 'string',
                'purchaseDate': 'string',
                'purchasedItemCount': 'bigint',
                'rewardsReceiptStatus': 'string',
                'totalSpent': 'string',
                'userId': 'string'},
 'duplicates': 0,
 'nulls': {'_id': 0,
           'bonusPointsEarned': 575,
           'bonusPointsEarnedReason': 575,
           'createDate': 0,
           'dateScanned': 0,
           'finishedDate': 551,
           'modifyDate': 0,
           'pointsAwardedDate': 582,
           'pointsEarned': 510,
           'purchaseDate': 448,
           'purchasedItemCount': 484,
           'rewardsRece

In [133]:
# data seems to be inconsistent month by month
receipts_df.select(F.date_trunc('month', F.col('dateScanned')).alias('month')).groupBy('month').agg(F.count('*')).show()

+-------------------+--------+
|              month|count(1)|
+-------------------+--------+
|2021-01-01 00:00:00|     640|
|2021-03-01 00:00:00|      23|
|2020-10-01 00:00:00|       2|
|2020-11-01 00:00:00|       6|
|2021-02-01 00:00:00|     448|
+-------------------+--------+



In [13]:
# bottom up check for brands df
pprint(basic_checks(brands_df))

{'data_types': {'_id': 'string',
                'barcode': 'string',
                'brandCode': 'string',
                'category': 'string',
                'categoryCode': 'string',
                'cpgId': 'string',
                'name': 'string',
                'topBrand': 'boolean'},
 'duplicates': 0,
 'nulls': {'_id': 0,
           'barcode': 0,
           'brandCode': 234,
           'category': 155,
           'categoryCode': 650,
           'cpgId': 0,
           'name': 0,
           'topBrand': 612},
 'summary_stats': {},
 'total_count': 1167,
 'uniqueness': {'_id': True,
                'barcode': False,
                'brandCode': False,
                'category': False,
                'categoryCode': False,
                'cpgId': False,
                'name': False,
                'topBrand': False}}


In [25]:
# check for dupes of brandCode and name
brands_df.groupBy('brandCode','name').agg(F.count('*').alias('ct')).orderBy(F.col('ct').desc()).show()

+--------------------+--------------------+---+
|           brandCode|                name| ct|
+--------------------+--------------------+---+
|             HUGGIES|             Huggies|  2|
|TEST BRANDCODE @1...|test brand @15973...|  1|
|TEST BRANDCODE @1...|test brand @16079...|  1|
|             MATADOR|      Matador Snacks|  1|
|TEST BRANDCODE @1...|test brand @15973...|  1|
|TEST BRANDCODE @1...|test brand @15990...|  1|
|TEST BRANDCODE @1...|test brand @16105...|  1|
|        MOUNTAIN DEW|        Mountain Dew|  1|
|                NULL|             Palermo|  1|
|        511111805137|         Real Simple|  1|
|TEST BRANDCODE @1...|test brand @16104...|  1|
|     FRUITY CHEERIOS|    Fruity Cheerios™|  1|
|TEST BRANDCODE @1...|test brand @16100...|  1|
|            KOOL-AID|            Kool-Aid|  1|
|                NULL|              Corona|  1|
|                    |        Kraft Cheese|  1|
|        511111105046|Food & Wine Magazine|  1|
|            CHESTERS|           CHESTER

                                                                                

In [134]:
# how many brands are test items?
brands_df.select(F.count('*'),F.sum(F.when(F.lower(F.col('name')).rlike('test'), 1).otherwise(0)).alias('test_ct')).show()

+--------+-------+
|count(1)|test_ct|
+--------+-------+
|    1167|    432|
+--------+-------+



In [124]:
# bottom up QA for users
pprint(basic_checks(users_df))

{'data_types': {'_id': 'string',
                'active': 'boolean',
                'createdDate': 'string',
                'lastLogin': 'string',
                'role': 'string',
                'signUpSource': 'string',
                'state': 'string'},
 'duplicates': 0,
 'nulls': {'_id': 0,
           'active': 0,
           'createdDate': 0,
           'lastLogin': 40,
           'role': 0,
           'signUpSource': 5,
           'state': 6},
 'summary_stats': {},
 'total_count': 212,
 'uniqueness': {'_id': True,
                'active': False,
                'createdDate': True,
                'lastLogin': False,
                'role': False,
                'signUpSource': False,
                'state': False}}


In [126]:
# why are last logins Null?
users_df.where(F.col('lastLogin').isNull()).show()

+--------------------+------+-------------------+---------+--------+------------+-----+
|                 _id|active|        createdDate|lastLogin|    role|signUpSource|state|
+--------------------+------+-------------------+---------+--------+------------+-----+
|6002541ae257124ec...|  true|2021-01-15 21:48:58|     NULL|consumer|       Email|   WI|
|6008896c633aab121...|  true|2021-01-20 14:50:04|     NULL|consumer|       Email|   WI|
|600f19914329897ea...|  true|2021-01-25 14:18:41|     NULL|consumer|       Email|   WI|
|5ffe115404929101d...|  true|2021-01-12 16:15:00|     NULL|consumer|       Email|   AL|
|600498c4e257124ec...|  true|2021-01-17 15:06:29|     NULL|consumer|       Email|   WI|
|600b477e7d983a124...|  true|2021-01-22 16:45:34|     NULL|consumer|       Email|   WI|
|600eea525edb7811c...|  true|2021-01-25 10:57:07|     NULL|consumer|       Email|   WI|
|60186237c8b50e11d...|  true|2021-02-01 15:19:03|     NULL|consumer|       Email| NULL|
|600258dafb296c4ef...|  true|202