### Read source data to spark dataframe

*   brands_df: brands data
*   receipts_df: receipts data
*   users_df: users data

Print out table schema


In [36]:
# prompt: Read data from json file and convert it into spark dataframe because i will need to leverage spark sql to write some queries for data analysis. I have 3 json files need to read: brands.json, receipts.json, and user.json

!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("ReadJsonFiles").getOrCreate()

# Read JSON files into Spark DataFrames
brands_df = spark.read.json("brands.json")
receipts_df = spark.read.json("receipts.json")
users_df = spark.read.json("users.json")

# Show the first few rows of each DataFrame
brands_df.printSchema()
receipts_df.printSchema()
users_df.printSchema()


root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- barcode: string (nullable = true)
 |-- brandCode: string (nullable = true)
 |-- category: string (nullable = true)
 |-- categoryCode: string (nullable = true)
 |-- cpg: struct (nullable = true)
 |    |-- $id: struct (nullable = true)
 |    |    |-- $oid: string (nullable = true)
 |    |-- $ref: string (nullable = true)
 |-- name: string (nullable = true)
 |-- topBrand: boolean (nullable = true)

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- bonusPointsEarned: long (nullable = true)
 |-- bonusPointsEarnedReason: string (nullable = true)
 |-- createDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- dateScanned: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- finishedDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- modifyDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 

## Refactor and diagram a new structured relational data model

I notice that rewardsReceiptItemList field contains lots of information related to the items that user bought. It may not be needed all the time and we can separate them into a different tables with the grain of receipt_id, barcode, brandCode. By designing table in this way, we can join it back to receipts table if there is any use case need both of them in the future.



*   receipts_items_df: new table created with grain of <receipt_id, barcode, brandCode>
*   new_receipts_df: same receipts table but refine rewardsReceiptItemList a bit to keep only <barcode, brandCode>



In [114]:
# prompt: parse array

from pyspark.sql.functions import explode
from pyspark.sql.functions import struct
from pyspark.sql.functions import col
from pyspark.sql.functions import transform, struct, col




# Explode the rewardsReceiptItemList array in the receipts DataFrame
exploded_receipts_df = receipts_df.select(
    "_id", "userId", "dateScanned", "createDate", explode("rewardsReceiptItemList").alias("item")
)

# Create a new column called "key" containing the struct
exploded_receipts_df = exploded_receipts_df.withColumn(
    "key", struct("_id", "item.barcode", "item.brandCode")
)

# Get all columns from "item" struct
item_columns = exploded_receipts_df.select("item.*").columns

# Exclude "barcode" and "brandCode"
selected_columns = [
    col("item." + c) for c in item_columns if c not in ["barcode", "brandCode"]
]

receipts_items_df = exploded_receipts_df.select(
    "key","dateScanned", "createDate", *selected_columns
    )

receipts_items_df.printSchema()

# Define a function to extract barcode and brandCode into a struct
def extract_barcode_brandCode(item):
    return struct(item.barcode.alias("barcode"), item.brandCode.alias("brandCode"))

new_receipts_df = receipts_df.withColumn(
    "rewardsReceiptItemList",
    transform("rewardsReceiptItemList", extract_barcode_brandCode)
)

new_receipts_df.printSchema()


root
 |-- key: struct (nullable = false)
 |    |-- _id: struct (nullable = true)
 |    |    |-- $oid: string (nullable = true)
 |    |-- barcode: string (nullable = true)
 |    |-- brandCode: string (nullable = true)
 |-- dateScanned: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- createDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- competitiveProduct: boolean (nullable = true)
 |-- competitorRewardsGroup: string (nullable = true)
 |-- deleted: boolean (nullable = true)
 |-- description: string (nullable = true)
 |-- discountedItemPrice: string (nullable = true)
 |-- finalPrice: string (nullable = true)
 |-- itemNumber: string (nullable = true)
 |-- itemPrice: string (nullable = true)
 |-- metabriteCampaignId: string (nullable = true)
 |-- needsFetchReview: boolean (nullable = true)
 |-- needsFetchReviewReason: string (nullable = true)
 |-- originalFinalPrice: string (nullable = true)
 |-- originalMetaBriteBarcode: string (nullable = 

## predetermined questions

### What are the top 5 brands by receipts scanned for most recent month?

As 2021-03 don't have any record contains brand info, I will instead take 2021-02 as the most recent month.


1.   BRAND
2.   MISSION
3.   VIVA
4.   N/A
5.   N/A


In [130]:
brands_df.createOrReplaceTempView("brands")
receipts_items_df.createOrReplaceTempView("receiptsItems")

spark.sql("""
with etc as (
  select key.brandCode, sum(quantityPurchased)
  from receiptsItems
  where key.brandCode is not null
    and year(date(from_unixtime(dateScanned.`$date`/1000))) = 2021
    and month(date(from_unixtime(dateScanned.`$date`/1000))) = 2
  group by 1
  order by 2 desc
  limit 5
)

select etc.*, name
from etc
left join brands
  on etc.brandCode = brands.brandCode

""").show(10, False)

# Return 2021-03
# spark.sql("""
# select
#   year(max(date(from_unixtime(dateScanned.`$date`/1000)))) as latest_year,
#   month(max(date(from_unixtime(dateScanned.`$date`/1000)))) as latest_month
# from receiptsItems
# """).show(10, False)

+---------+----------------------+----+
|brandCode|sum(quantityPurchased)|name|
+---------+----------------------+----+
|BRAND    |3                     |NULL|
|MISSION  |2                     |NULL|
|VIVA     |1                     |Viva|
+---------+----------------------+----+



### How does the ranking of the top 5 brands by receipts scanned for the recent month compare to the ranking for the previous month?

Knowing that the base line i used for the most recent month is 2021-02. Thus, I will take 2021-01 as the previous month to compare with. I per i can see from the result that previous month's ranking is completely different from the latest month. I personally think the data point is not enough for the latest month given the true that the amount of data is 6 (2021-02) versus 2702 (2021-01). More data points will be needed in order to make a further investigation and conclusion.

1.   BEN AND JERRYS
2.   KNORR
3.   HY-VEE  
4.   PEPSI
5.   KLEENEX

In [131]:
spark.sql("""
with etc as (
  select key.brandCode, sum(quantityPurchased)
  from receiptsItems
  where key.brandCode is not null
    and year(date(from_unixtime(dateScanned.`$date`/1000))) = 2021
    and month(date(from_unixtime(dateScanned.`$date`/1000))) = 1
  group by 1
  order by 2 desc
  limit 5
)

select etc.*, name
from etc
left join brands
  on etc.brandCode = brands.brandCode

""").show(10, False)

+--------------+----------------------+-------+
|brandCode     |sum(quantityPurchased)|name   |
+--------------+----------------------+-------+
|BEN AND JERRYS|425                   |NULL   |
|KNORR         |393                   |KNORR  |
|HY-VEE        |308                   |NULL   |
|PEPSI         |231                   |Pepsi  |
|KLEENEX       |135                   |Kleenex|
+--------------+----------------------+-------+



### When considering average spend from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?

I view "FINISHED" as "Accepted". Thus, Accepted is greater.

In [110]:
new_receipts_df.createOrReplaceTempView("receipts")
spark.sql("""
select rewardsReceiptStatus, avg(totalSpent)
from receipts
group by 1
""").show(10, False)


+--------------------+------------------+
|rewardsReceiptStatus|avg(totalSpent)   |
+--------------------+------------------+
|SUBMITTED           |NULL              |
|FLAGGED             |180.4517391304348 |
|FINISHED            |80.85430501930502 |
|REJECTED            |23.326056338028184|
|PENDING             |28.03244897959184 |
+--------------------+------------------+



### When considering total number of items purchased from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?

Accepted is greater

In [113]:
spark.sql("""
select rewardsReceiptStatus, sum(purchasedItemCount)
from receipts
group by 1
""").show(5, False)


+--------------------+-----------------------+
|rewardsReceiptStatus|sum(purchasedItemCount)|
+--------------------+-----------------------+
|SUBMITTED           |NULL                   |
|FLAGGED             |1014                   |
|FINISHED            |8184                   |
|REJECTED            |173                    |
|PENDING             |NULL                   |
+--------------------+-----------------------+



### Which brand has the most spend among users who were created within the past 6 months?

BEN AND JERRYS

In [134]:
spark.sql("""
with etc as (
  select key.brandCode, sum(finalPrice)
  from receiptsItems
  where key.brandCode is not null
  group by 1
  order by 2 desc
  limit 1
)
select etc.*, name
from etc
left join brands
  on etc.brandCode = brands.brandCode
""").show(5, False)

+--------------+-----------------+----+
|brandCode     |sum(finalPrice)  |name|
+--------------+-----------------+----+
|BEN AND JERRYS|2149.449999999998|NULL|
+--------------+-----------------+----+



### Which brand has the most transactions among users who were created within the past 6 months?
BEN AND JERRYS

In [135]:
spark.sql("""
with etc as (
  select key.brandCode, count(distinct key._id.`$oid`)
  from receiptsItems
  where key.brandCode is not null
  group by 1
  order by 2 desc
  limit 1
)
select etc.*, name
from etc
left join brands
  on etc.brandCode = brands.brandCode
""").show(5, False)

+--------------+----------------------------+----+
|brandCode     |count(DISTINCT key._id.$oid)|name|
+--------------+----------------------------+----+
|BEN AND JERRYS|32                          |NULL|
+--------------+----------------------------+----+



### Conclusion

1.   Looks like brands table is having some data missing. Need more investigation and clarification on the issue.
2.   Some fields are being defined as struct which i don't think it's needed. e.g., _id, createDate. Need to clarify with data owner to see why we were doing so.
3.   What is the meaning of `deleted` flag in itemList? The definition of this field will impact the way we calculate the metrics above.
4.   Do we really need purchasedItemCount and totalSpent as we can also get these number from ItemList?
5.   In real world, we might have to consider the partition key of table in data warehouse. This will help improving query performance while doing the data anlysis and reduce the running cost. e.g., for receipts table, we can take `createDate` as partition key to filter on the receipts that are created past certain date.
6.   Batch job will be needed if users are still facig bad query performance.