
Create and set up session + Import packages

In [1]:
#Import packages and set java package
import pyspark
import os

os.environ['JAVA_HOME']= "C:\\Program Files\\Java\\jdk-1.8"

In [2]:
# Start Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.Builder() \
        .config("spark.driver.memory", "15g")\
        .appName("MyApp") \
        .getOrCreate() 

print("Success")

Success


Import data files and convert from dictionary values

In [3]:
from pyspark.sql.functions import col, from_unixtime, explode  

In [4]:
# Create a Spark session
spark = SparkSession.builder.appName("LoadGzippedJSON").getOrCreate()

# Load the gzipped JSON file into a Spark DataFrame
users = spark.read.json("C:\\Users\\dionn\\Downloads\\users.json.gz")

users.show()

users.printSchema()


+--------------------+------+---------------+---------------+--------+------------+-----+
|                 _id|active|    createdDate|      lastLogin|    role|signUpSource|state|
+--------------------+------+---------------+---------------+--------+------------+-----+
|{5ff1e194b6a9d73a...|  true|{1609687444800}|{1609687537858}|consumer|       Email|   WI|
|{5ff1e194b6a9d73a...|  true|{1609687444800}|{1609687537858}|consumer|       Email|   WI|
|{5ff1e194b6a9d73a...|  true|{1609687444800}|{1609687537858}|consumer|       Email|   WI|
|{5ff1e1eacfcf6c39...|  true|{1609687530554}|{1609687530597}|consumer|       Email|   WI|
|{5ff1e194b6a9d73a...|  true|{1609687444800}|{1609687537858}|consumer|       Email|   WI|
|{5ff1e194b6a9d73a...|  true|{1609687444800}|{1609687537858}|consumer|       Email|   WI|
|{5ff1e1e8cfcf6c39...|  true|{1609687528354}|{1609687528392}|consumer|       Email|   WI|
|{5ff1e1b7cfcf6c39...|  true|{1609687479626}|{1609687479665}|consumer|       Email|   WI|
|{5ff1e194

In [5]:


# Extract fields from structs
users_flattened = users.select(
    col("_id.$oid").alias("id"),
    col("active"),
    from_unixtime(col("createdDate.$date") / 1000).alias("created_date"),
    from_unixtime(col("lastLogin.$date") / 1000).alias("last_login"),
    col("role"),
    col("signUpSource"),
    col("state")
)

# Step 2: Show the result
users_flattened.show(truncate=False)


+------------------------+------+-------------------+-------------------+--------+------------+-----+
|id                      |active|created_date       |last_login         |role    |signUpSource|state|
+------------------------+------+-------------------+-------------------+--------+------------+-----+
|5ff1e194b6a9d73a3a9f1052|true  |2021-01-03 10:24:04|2021-01-03 10:25:37|consumer|Email       |WI   |
|5ff1e194b6a9d73a3a9f1052|true  |2021-01-03 10:24:04|2021-01-03 10:25:37|consumer|Email       |WI   |
|5ff1e194b6a9d73a3a9f1052|true  |2021-01-03 10:24:04|2021-01-03 10:25:37|consumer|Email       |WI   |
|5ff1e1eacfcf6c399c274ae6|true  |2021-01-03 10:25:30|2021-01-03 10:25:30|consumer|Email       |WI   |
|5ff1e194b6a9d73a3a9f1052|true  |2021-01-03 10:24:04|2021-01-03 10:25:37|consumer|Email       |WI   |
|5ff1e194b6a9d73a3a9f1052|true  |2021-01-03 10:24:04|2021-01-03 10:25:37|consumer|Email       |WI   |
|5ff1e1e8cfcf6c399c274ad9|true  |2021-01-03 10:25:28|2021-01-03 10:25:28|consumer|

In [6]:



brands = spark.read.json("C:\\Users\\dionn\\Downloads\\brands.json.gz")


brands.show()

brands.printSchema()


+--------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+
|                 _id|     barcode|           brandCode|            category|        categoryCode|                 cpg|                name|topBrand|
+--------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+
|{601ac115be37ce2e...|511111019862|                NULL|              Baking|              BAKING|{{601ac114be37ce2...|test brand @16123...|   false|
|{601c5460be37ce2e...|511111519928|           STARBUCKS|           Beverages|           BEVERAGES|{{5332f5fbe4b03c9...|           Starbucks|   false|
|{601ac142be37ce2e...|511111819905|TEST BRANDCODE @1...|              Baking|              BAKING|{{601ac142be37ce2...|test brand @16123...|   false|
|{601ac142be37ce2e...|511111519874|TEST BRANDCODE @1...|              Baking|              BAKING|{{

In [7]:


brands_flattened = brands.select(
    col("_id.$oid").alias("id"),
    col("barcode"),
    col("brandCode"),
    col("category"),
    col("categoryCode"),
    col("cpg.$id.$oid").alias("cpg_id"),
    col("cpg.$ref").alias("cpg_ref"),
    col("name"),
    col("topBrand")
)

brands_flattened.show(truncate=False)

brands_flattened.printSchema()



+------------------------+------------+-----------------------------+--------------------+--------------------+------------------------+-------+-------------------------+--------+
|id                      |barcode     |brandCode                    |category            |categoryCode        |cpg_id                  |cpg_ref|name                     |topBrand|
+------------------------+------------+-----------------------------+--------------------+--------------------+------------------------+-------+-------------------------+--------+
|601ac115be37ce2ead437551|511111019862|NULL                         |Baking              |BAKING              |601ac114be37ce2ead437550|Cogs   |test brand @1612366101024|false   |
|601c5460be37ce2ead43755f|511111519928|STARBUCKS                    |Beverages           |BEVERAGES           |5332f5fbe4b03c9a25efd0ba|Cogs   |Starbucks                |false   |
|601ac142be37ce2ead43755d|511111819905|TEST BRANDCODE @1612366146176|Baking              |BAKING    

In [8]:


receipts = spark.read.json("C:\\Users\\dionn\\Downloads\\receipts.json.gz")

receipts.show()

receipts.printSchema()

+--------------------+-----------------+-----------------------+---------------+---------------+---------------+---------------+-----------------+------------+---------------+------------------+----------------------+--------------------+----------+--------------------+
|                 _id|bonusPointsEarned|bonusPointsEarnedReason|     createDate|    dateScanned|   finishedDate|     modifyDate|pointsAwardedDate|pointsEarned|   purchaseDate|purchasedItemCount|rewardsReceiptItemList|rewardsReceiptStatus|totalSpent|              userId|
+--------------------+-----------------+-----------------------+---------------+---------------+---------------+---------------+-----------------+------------+---------------+------------------+----------------------+--------------------+----------+--------------------+
|{5ff1e1eb0a720f05...|              500|   Receipt number 2 ...|{1609687531000}|{1609687531000}|{1609687531000}|{1609687536000}|  {1609687531000}|       500.0|{1609632000000}|            

In [9]:

# Flatten the nested fields (the items are further nested so will need to explode)
receipts_flattened = receipts.select(
    col("_id.$oid").alias("id"),
    col("bonusPointsEarned"),
    col("bonusPointsEarnedReason"),
    from_unixtime(col("createDate.$date") / 1000).alias("create_date"),
    from_unixtime(col("dateScanned.$date") / 1000).alias("date_scanned"),
    from_unixtime(col("finishedDate.$date") / 1000).alias("finished_date"),
    from_unixtime(col("modifyDate.$date") / 1000).alias("modify_date"),
    from_unixtime(col("pointsAwardedDate.$date") / 1000).alias("points_awarded_date"),
    col("pointsEarned"),
    from_unixtime(col("purchaseDate.$date") / 1000).alias("purchase_date"),
    col("purchasedItemCount"),
    col("rewardsReceiptStatus"),
    col("totalSpent"),
    col("userId"),
    explode("rewardsReceiptItemList").alias("item")  # Explode the array
)

# add fields from exploded items
receipts_flattened = receipts_flattened.select(
    "id", "bonusPointsEarned", "bonusPointsEarnedReason", "create_date", "date_scanned", 
    "finished_date", "modify_date", "points_awarded_date", "pointsEarned", "purchase_date", 
    "purchasedItemCount", "rewardsReceiptStatus", "totalSpent", "userId",
    col("item.barcode").alias("item_barcode"),
    col("item.brandCode").alias("item_brand_code"),
    col("item.competitiveProduct").alias("item_competitive_product"),
    col("item.competitorRewardsGroup").alias("item_competitor_rewards_group"),
    col("item.deleted").alias("item_deleted"),
    col("item.description").alias("item_description"),
    col("item.discountedItemPrice").alias("item_discounted_price"),
    col("item.finalPrice").alias("item_final_price"),
    col("item.itemNumber").alias("item_number"),
    col("item.itemPrice").alias("item_price"),
    col("item.metabriteCampaignId").alias("item_metabrite_campaign_id"),
    col("item.needsFetchReview").alias("item_needs_fetch_review"),
    col("item.needsFetchReviewReason").alias("item_needs_fetch_review_reason"),
    col("item.originalFinalPrice").alias("item_original_final_price"),
    col("item.originalMetaBriteBarcode").alias("item_original_metabrite_barcode"),
    col("item.originalMetaBriteDescription").alias("item_original_metabrite_description"),
    col("item.originalMetaBriteItemPrice").alias("item_original_metabrite_item_price"),
    col("item.originalMetaBriteQuantityPurchased").alias("item_original_metabrite_quantity"),
    col("item.originalReceiptItemText").alias("item_original_receipt_text"),
    col("item.partnerItemId").alias("item_partner_id"),
    col("item.pointsEarned").alias("item_points_earned"),
    col("item.pointsNotAwardedReason").alias("item_points_not_awarded_reason"),
    col("item.pointsPayerId").alias("item_points_payer_id"),
    col("item.preventTargetGapPoints").alias("item_prevent_target_gap_points"),
    col("item.priceAfterCoupon").alias("item_price_after_coupon"),
    col("item.quantityPurchased").alias("item_quantity_purchased"),
    col("item.rewardsGroup").alias("item_rewards_group"),
    col("item.rewardsProductPartnerId").alias("item_rewards_product_partner_id"),
    col("item.targetPrice").alias("item_target_price"),
    col("item.userFlaggedBarcode").alias("item_user_flagged_barcode"),
    col("item.userFlaggedDescription").alias("item_user_flagged_description"),
    col("item.userFlaggedNewItem").alias("item_user_flagged_new_item"),
    col("item.userFlaggedPrice").alias("item_user_flagged_price"),
    col("item.userFlaggedQuantity").alias("item_user_flagged_quantity")
)


receipts_flattened.show(truncate=False)

receipts_flattened.printSchema()


+------------------------+-----------------+-----------------------------------------------------------------------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------+-------------------+------------------+--------------------+----------+------------------------+------------+---------------+------------------------+-----------------------------+------------+----------------------------------------------------------------------+---------------------+----------------+------------+----------+--------------------------+-----------------------+------------------------------+-------------------------+-------------------------------+------------------------------------------------------------+----------------------------------+--------------------------------+--------------------------+---------------+------------------+-----------------------------------+------------------------+------------------------------+---------

In [10]:
users_flattened.createOrReplaceTempView("users")
brands_flattened.createOrReplaceTempView("brands")
receipts_flattened.createOrReplaceTempView("receipts")

What are the top 5 brands by receipts scanned for most recent month? -- receipts, matched to brands, count receipts
How does the ranking of the top 5 brands by receipts scanned for the recent month compare to the ranking for the previous month? month/date
When considering average spend from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater? total spend rewardsReceiptStatus
When considering total number of items purchased from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater? number of items
Which brand has the most spend among users who were created within the past 6 months? -- users, createdDate
Which brand has the most transactions among users who were created within the past 6 months? createdDate

Business questions

In [11]:
q1_df= spark.sql(f"""

with recent_month as
(select max(date_trunc("month",date_scanned)) as max_month
from receipts)

select *
from recent_month

""")

q1_df.show()

+-------------------+
|          max_month|
+-------------------+
|2021-03-01 00:00:00|
+-------------------+



In [12]:
q1_df= spark.sql(f"""

select date_trunc("month",date_scanned) as month, count(distinct id) as num_receipts
from receipts
group by 1
order by 1

""")

q1_df.show()

+-------------------+------------+
|              month|num_receipts|
+-------------------+------------+
|2020-10-01 00:00:00|           2|
|2020-11-01 00:00:00|           6|
|2021-01-01 00:00:00|         539|
|2021-02-01 00:00:00|         120|
|2021-03-01 00:00:00|          12|
+-------------------+------------+



In [13]:
q1_df= spark.sql(f"""

with recent_month as
(select max(date_trunc("month",date_scanned)) as max_month
from receipts)

select name, count(distinct a.id) as num_receipts
from receipts a
left join brands b
on a.item_barcode = b.barcode
where date_trunc("month",date_scanned) in (select max_month from recent_month)
group by 1


""")

q1_df.show()

+----+------------+
|name|num_receipts|
+----+------------+
|NULL|          12|
+----+------------+



In [14]:
q1_df= spark.sql(f"""

with months as
(select date_trunc("month",date_scanned) as month
from receipts
group by 1),

ranking as
(
select month, rank() over (order by month desc) as month_rank
from months
)

select name, count(distinct a.id) as num_receipts
from receipts a
left join brands b
on a.item_barcode = b.barcode
where date_trunc("month",date_scanned)= (select month from ranking where month_rank=2)
group by 1
order by 2 desc

""")

q1_df.show()

+----+------------+
|name|num_receipts|
+----+------------+
|NULL|         120|
+----+------------+



In [15]:
q2_df= spark.sql(f"""

with months as
(select date_trunc("month",date_scanned) as month
from receipts
group by 1),

ranking as
(
select month, rank() over (order by month desc) as month_rank
from months
)

select name, count(distinct case when month_rank=1 then a.id end) as last_month_receipts, count(distinct case when month_rank=2 then a.id end) as previous_month_receipts
from receipts a
left join brands b
on a.item_barcode = b.barcode
left join ranking c
on date_trunc("month",a.date_scanned)=c.month
group by 1
order by 2 desc

""")

q2_df.show()

+--------------------+-------------------+-----------------------+
|                name|last_month_receipts|previous_month_receipts|
+--------------------+-------------------+-----------------------+
|                NULL|                 12|                    120|
|              Quaker|                  0|                      0|
|             Cheetos|                  0|                      0|
|       Pacific Foods|                  0|                      0|
|               Prego|                  0|                      0|
|Cracker Barrel Ch...|                  0|                      0|
|             Swanson|                  0|                      0|
|         Grey Poupon|                  0|                      0|
|     Diet Chris Cola|                  0|                      0|
|        Kettle Brand|                  0|                      0|
|               Kraft|                  0|                      0|
|         Rice A Roni|                  0|                    

In [16]:
q2_df= spark.sql(f"""

with months as
(select date_trunc("month",date_scanned) as month
from receipts
group by 1),

ranking as
(
select month, rank() over (order by month desc) as month_rank
from months
)

select a.item_user_flagged_description, count(distinct a.id)
from receipts a
left join brands b
on a.item_barcode = b.barcode
left join ranking c
on date_trunc("month",a.date_scanned)=c.month
group by 1


""")

q2_df.show()

+-----------------------------+------------------+
|item_user_flagged_description|count(DISTINCT id)|
+-----------------------------+------------------+
|         MILLER LITE 24 PA...|                28|
|         DORITOS TORTILLA ...|                 7|
|                         NULL|               660|
|                             |                51|
+-----------------------------+------------------+



In [17]:
q3_df= spark.sql(f"""

with base as
(select id, rewardsReceiptStatus, first(cast(totalSpent as double)) as txn_spend
from receipts
group by 1,2)

select rewardsReceiptStatus, avg(txn_spend) as avg_spend
from base
group by 1

""")

q3_df.show()

+--------------------+-----------------+
|rewardsReceiptStatus|        avg_spend|
+--------------------+-----------------+
|             FLAGGED|180.4517391304348|
|            FINISHED|81.16769379844963|
|            REJECTED|24.35514705882354|
|             PENDING|28.03244897959184|
+--------------------+-----------------+



In [19]:
q4_df= spark.sql(f"""


select rewardsReceiptStatus, sum(item_quantity_purchased) as num_items_purchased
from receipts
group by 1

""")

q4_df.show()

+--------------------+-------------------+
|rewardsReceiptStatus|num_items_purchased|
+--------------------+-------------------+
|             FLAGGED|               1014|
|            FINISHED|               8176|
|            REJECTED|                141|
|             PENDING|                 49|
+--------------------+-------------------+



In [25]:
qa_df= spark.sql(f"""


select max(created_date)
from users
order by 1 desc

""")



qa_df.show()

+-------------------+
|  max(created_date)|
+-------------------+
|2021-02-12 09:11:06|
+-------------------+



In [30]:
qa_df= spark.sql(f"""


with spend_column as
(select id, first(cast(totalSpent as double)) as txn_spend
from receipts
group by 1),

price_column as
(select id, sum(item_final_price) as sum_item_price
from receipts
group by 1),


final as
(select a.id, ((txn_spend-sum_item_price)/txn_spend) as diff
from spend_column a
left join price_column b
on a.id=b.id)

select *
from final
where diff <> 0

""")



qa_df.show()

+--------------------+--------------------+
|                  id|                diff|
+--------------------+--------------------+
|60099c3c0a7214ad8...|6.297044296090397...|
|60189c900a7214ad2...|                -1.7|
|5ff726810a720f052...|               -2.56|
|600260210a720f05f...|-2.17962073125665...|
|6009f5ea0a7214ada...|-3.46733066126680...|
|600996ac0a720f05f...|1.896867182594454...|
|5ffcb4ad0a720f051...|               -2.56|
|6004a5f20a7214ad4...|-5.94016499172439...|
|6009e72c0a720f053...|4.680204097057183...|
|600a1a8d0a7214ada...|-1.15310798128593...|
|600edb570a720f053...|-1.58951439007894...|
|600b39f60a720f053...|-1.42802727075302...|
|600eda0c0a7214ada...|-1.34178592344462...|
|600f2fc80a720f053...|4.182802816634399...|
|600370250a720f05f...|-5.34688651114609...|
|5ffcb4b80a7214ad4...|                -1.2|
|600992f90a720f05f...|3.256009786963455...|
|60020d2a0a720f05f...|-4.56512532679491...|
|6004a99e0a720f05f...|-3.95673323663503...|
|600b34ca0a7214ada...|-4.1785877

In [31]:
qa_df= spark.sql(f"""

with base as
(select id, count(*) as num
from users
group by 1)

select id, num
from base
where num>1

""")



qa_df.show()

+--------------------+---+
|                  id|num|
+--------------------+---+
|600f47f06fd0dc176...|  3|
|60005709bd4dff11d...|  2|
|6014558767804a122...|  2|
|5fbc35711d967d122...|  3|
|5ffc8f9704929111f...|  6|
|5fff55dabd4dff11d...|  7|
|5ff4ce3dc3d63511e...|  4|
|6011f33173c60b180...|  4|
|5fff0f4fb3348b03e...|  4|
|60189c74c8b50e11d...|  7|
|5ff73b90eb7c7d31c...|  2|
|601c2c05969c0b11f...|  2|
|60074b49325c8a179...|  2|
|600056a3f7e5b011f...|  8|
|6008893b633aab121...|  5|
|6010bddaa4b74c120...|  3|
|5ff5d15aeb7c7d120...| 18|
|60088d55633aab121...|  2|
|6011f31ea4b74c18d...|  4|
|5ff36d0362fde9121...|  3|
+--------------------+---+
only showing top 20 rows



In [32]:
qa_df= spark.sql(f"""

with base as
(select id, created_date, count(*) as num
from users
group by 1,2)

select id, created_date, num
from base
where num>1

""")



qa_df.show()

+--------------------+-------------------+---+
|                  id|       created_date|num|
+--------------------+-------------------+---+
|600987d77d983a11f...|2021-01-21 08:55:35|  9|
|5fbc35711d967d122...|2020-11-23 17:19:29|  3|
|6000b7aefb296c121...|2021-01-14 16:29:18|  3|
|5fff55dabd4dff11d...|2021-01-13 15:19:38|  7|
|601c2c05969c0b11f...|2021-02-04 12:16:53|  2|
|600741d06e6469120...|2021-01-19 15:32:17|  6|
|5ff7268eeb7c7d120...|2021-01-07 10:19:42|  2|
|5ff8ce8504929111f...|2021-01-08 16:28:37|  3|
|6011f33173c60b180...|2021-01-27 18:11:45|  4|
|5ffc9d87b3348b11c...|2021-01-11 13:48:39|  3|
|60189c94c8b50e11d...|2021-02-01 19:28:04|  4|
|5fc961c3b8cfca11a...|2020-12-03 17:08:03| 20|
|600748196e6469171...|2021-01-19 15:59:05|  4|
|5ff1e1eacfcf6c399...|2021-01-03 10:25:30|  4|
|600f41b2bd196811e...|2021-01-25 17:09:55|  2|
|60088d55633aab121...|2021-01-20 15:06:45|  2|
|6007464b6e6469171...|2021-01-19 15:51:23|  3|
|60145ff384231211c...|2021-01-29 14:20:19|  2|
|60183090c8b5