In [23]:
import json
import pandas as pd
# !pip install duckdb 
# may need to install duckdb ^^
import duckdb
from datetime import datetime
import io

In [2]:
#load and clean up users
users_df = pd.read_json('users.json', lines = True)
users_df = users_df.fillna('')

users_df['createdDate'] = users_df['createdDate'].apply(lambda x: datetime.fromtimestamp(x['$date'] / 1000) if x != '' else '')
users_df['_id'] = users_df['_id'].apply(lambda x: json.loads(json.dumps(x))['$oid'] if x != '' else '')
users_df['lastLogin'] = users_df['lastLogin'].apply(lambda x: datetime.fromtimestamp(x['$date'] / 1000) if x != '' else '')

In [3]:
#load and clean up receipts
receipts_df = pd.read_json('receipts.json', lines = True)
receipts_df = receipts_df.fillna('')

receipts_df['modifyDate'] = receipts_df['modifyDate'].apply(lambda x: datetime.fromtimestamp(x['$date'] / 1000) if x != '' else '')
receipts_df['createDate'] = receipts_df['createDate'].apply(lambda x: datetime.fromtimestamp(x['$date'] / 1000) if x != '' else '')
receipts_df['finishedDate'] = receipts_df['finishedDate'].apply(lambda x: datetime.fromtimestamp(x['$date'] / 1000) if x != '' else '')
receipts_df['pointsAwardedDate'] = receipts_df['pointsAwardedDate'].apply(lambda x: datetime.fromtimestamp(x['$date'] / 1000) if x != '' else '')
receipts_df['dateScanned'] = receipts_df['dateScanned'].apply(lambda x: datetime.fromtimestamp(x['$date'] / 1000) if x != '' else '')
receipts_df['purchaseDate'] = receipts_df['purchaseDate'].apply(lambda x: datetime.fromtimestamp(x['$date'] / 1000) if x != '' else '')
receipts_df['rewardsReceiptItemList'] = receipts_df['rewardsReceiptItemList'].apply(lambda x: json.loads(json.dumps(x)) if x != '' else None)
receipts_df['_id'] = receipts_df['_id'].apply(lambda x: x['$oid'] if x != '' else None)

receipts_df = receipts_df.explode('rewardsReceiptItemList')
receipts_df = receipts_df.reset_index(drop = True)
receipts_df = receipts_df.join(pd.json_normalize(receipts_df['rewardsReceiptItemList']), rsuffix = '_dict')
##^^ unpack rewardsReceiptItemList json so we have noramalized structure for all attributes captured in rewardsReceiptItemList per record
receipts_df = receipts_df.fillna('')
receipts_df = receipts_df.drop(columns = ['rewardsReceiptItemList','pointsEarned_dict'])

In [4]:
#load and clean up brands
brands_df = pd.read_json('brands.json', lines = True)
users_df = users_df.fillna('')

brands_df['_id'] = brands_df['_id'].apply(lambda x: json.loads(json.dumps(x))['$oid'] if x != '' else '')

brands_df = brands_df.join(pd.json_normalize(brands_df['cpg'], errors = 'ignore'), rsuffix = '_dict')
brands_df = brands_df.drop(columns = ['cpg'])
brands_df = brands_df.rename(columns = {'$ref': 'cpgRef', '$id.$oid': 'cpgId'})

In [5]:
#see ERD - the above^^ operations define the "raw" layer - in a "real" / production situation would perform clean up and either dump to data lake or load to a "raw" layer in the warehouse

In [6]:
brands_schema = {x: brands_df[x].dtypes for x in list(brands_df.columns)}
users_schema = {x: users_df[x].dtypes for x in list(users_df.columns)}
receipts_schema = {x: receipts_df[x].dtypes for x in list(receipts_df.columns)}

In [7]:
# extract data set as json for load into localized duckdb db environment
# using duckdb / pandas connector where the name of the pandas dataframe is referenced directly in query
duckdb.sql("""
with total_receipts as (
    select
    receipts_df._id as receipt_id
    ,case when receipts_df.brandCode in ('','BRAND') then 'NO_BRAND_PROVIDED' else receipts_df.brandCode end as brand_code
    ,cast(receipts_df.dateScanned as varchar) as date_scanned
    ,cast(case when receipts_df.itemPrice = '' then 0.0 else cast(receipts_df.itemPrice as decimal) end as decimal) as item_price
    ,cast(case when receipts_df.purchasedItemCount = '' then 0 else cast(receipts_df.purchasedItemCount as integer) end as integer) as purchased_item_count
    ,cast(case when receipts_df.totalSpent = '' then 0.0 else cast(receipts_df.totalSpent as decimal) end as decimal) as total_spent
    ,receipts_df.rewardsReceiptStatus as rewards_receipt_status
    ,users_duped._id as user_id
    ,cast(users_duped.createdDate as varchar) as user_created_date
    from receipts_df
    left join 
    (
        select
        _id
        ,active
        ,createdDate
        ,lastLogin
        ,role
        ,signUpSource
        ,state
        from
        (
            select 
            *
            ,count(*) as duplicate_count
            from users_df
            group by 1,2,3,4,5,6,7
        )
    ) users_duped on receipts_df.userId = users_duped._id
)

select * from total_receipts
""").to_df().to_json('receipts_detail_data_model.json', orient = 'records')

In [9]:
con = duckdb.connect()

In [10]:
# load extracted json "receipts_detail_data_model" into duckdb table - creates "discovery layer" see ERD
con.execute("""
    drop table if exists receipts_detail;
    
    create table receipts_detail (
        receipt_id varchar(25)
        ,brand_code varchar(25)
        ,date_scanned timestamp
        ,item_price decimal(10,2)
        ,purchased_item_count integer
        ,total_spent decimal(10,2)
        ,rewards_receipt_status varchar(25)
        ,user_id varchar(25)
        ,user_created_date timestamp
        );
        
    truncate receipts_detail;
    
    copy receipts_detail from 'receipts_detail_data_model.json' (format json, array True);
""")

<duckdb.duckdb.DuckDBPyConnection at 0x117b057b0>

In [24]:
# load into "reporting / analytics" layer brand_scanned_fact view
con.sql("""
    drop view if exists brand_scanned_fact;
    create view brand_scanned_fact as
        with receipt_count as (
            select 
            year(date_scanned) as date_scanned_year
            ,month(date_scanned) as date_scanned_month
            ,coalesce(brand_code, 'NO_BRAND_PROVIDED') as brand
            ,count(distinct receipt_id) as receipt_count_scanned
            from receipts_detail
            group by 1,2,3
        )
    
        ,most_recent_month as (
            select
            date_scanned_year
            ,date_scanned_month
            ,dense_rank() over (order by date_scanned_year desc, date_scanned_month desc) as month_rank
            from
            (
                select
                date_scanned_year
                ,date_scanned_month
                ,count(*)
                from receipt_count
                group by 1,2
            )
        )
    
        select
        receipt_count.date_scanned_year
        ,receipt_count.date_scanned_month
        ,month_rank
        ,brand
        ,receipt_count_scanned
        ,dense_rank() over (partition by receipt_count.date_scanned_year, receipt_count.date_scanned_month order by receipt_count desc) as receipt_scanned_rank
        from receipt_count
        inner join most_recent_month on receipt_count.date_scanned_year = most_recent_month.date_scanned_year 
            and receipt_count.date_scanned_month = most_recent_month.date_scanned_month
        order by 1 desc, 2 desc, 5 asc
""")

In [13]:
# What are the top 5 brands by receipts scanned for most recent month?
con.sql("""select * from brand_scanned_fact where month_rank = 1""")

┌───────────────────┬────────────────────┬────────────┬───────────────────┬───────────────────────┬──────────────────────┐
│ date_scanned_year │ date_scanned_month │ month_rank │       brand       │ receipt_count_scanned │ receipt_scanned_rank │
│       int64       │       int64        │   int64    │      varchar      │         int64         │        int64         │
├───────────────────┼────────────────────┼────────────┼───────────────────┼───────────────────────┼──────────────────────┤
│              2021 │                  3 │          1 │ NO_BRAND_PROVIDED │                    23 │                    1 │
└───────────────────┴────────────────────┴────────────┴───────────────────┴───────────────────────┴──────────────────────┘

In [14]:
# How does the ranking of the top 5 brands by receipts scanned for the recent month compare to the ranking for the previous month?
con.sql("""select * from brand_scanned_fact where month_rank <= 2""")

┌───────────────────┬────────────────────┬────────────┬───────────────────┬───────────────────────┬──────────────────────┐
│ date_scanned_year │ date_scanned_month │ month_rank │       brand       │ receipt_count_scanned │ receipt_scanned_rank │
│       int64       │       int64        │   int64    │      varchar      │         int64         │        int64         │
├───────────────────┼────────────────────┼────────────┼───────────────────┼───────────────────────┼──────────────────────┤
│              2021 │                  3 │          1 │ NO_BRAND_PROVIDED │                    23 │                    1 │
│              2021 │                  2 │          2 │ VIVA              │                     1 │                    1 │
│              2021 │                  2 │          2 │ MISSION           │                     2 │                    3 │
│              2021 │                  2 │          2 │ NO_BRAND_PROVIDED │                   446 │                    2 │
└───────────────

In [15]:
# load into "reporting / analytics" layer reward_status_fact view
con.sql("""
    drop view if exists reward_status_fact;
    create view reward_status_fact as 
        select
        rewards_receipt_status
        ,avg(total_receipt) as average_item_price
        ,sum(total_items_receipt) as total_items
        from
        (
            select
            receipt_id
            ,rewards_receipt_status
            ,avg(purchased_item_count) as total_items_receipt
            ,sum(item_price) as total_receipt
            from receipts_detail
            group by 1,2
        ) a
        group by 1
""")

In [16]:
# When considering average spend from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?
# When considering total number of items purchased from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?
con.sql('select * from reward_status_fact')

┌────────────────────────┬────────────────────┬─────────────┐
│ rewards_receipt_status │ average_item_price │ total_items │
│        varchar         │       double       │   double    │
├────────────────────────┼────────────────────┼─────────────┤
│ PENDING                │            27.4718 │         0.0 │
│ FLAGGED                │  180.4517391304348 │      1014.0 │
│ REJECTED               │  23.80492957746479 │       173.0 │
│ FINISHED               │  80.90059845559846 │      8184.0 │
│ SUBMITTED              │                0.0 │         0.0 │
└────────────────────────┴────────────────────┴─────────────┘

In [17]:
# load into "reporting / analytics" layer brand_transaction_fact view
con.sql("""
    drop view if exists brand_transaction_fact;
    create view brand_transaction_fact as
        with users_past_6_months as (
            select 
            user_id
            from receipts_detail
            group by user_id, user_created_date
            having datediff('month', user_created_date, max(date_scanned)) <= 6
        )
    
        select
        brand_code
        ,sum(item_price) as total_spend
        ,count(distinct receipt_id) as total_transactions
        from receipts_detail
        inner join users_past_6_months on users_past_6_months.user_id = receipts_detail.user_id
        group by 1
        order by 2 desc
""")

In [18]:
# Which brand has the most spend among users who were created within the past 6 months?
# Which brand has the most transactions among users who were created within the past 6 months?
con.sql("""
    select
    brand_code
    ,total_spend
    ,dense_rank() over (order by total_spend desc) as total_spend_rank
    ,total_transactions
    ,dense_rank() over (order by total_transactions desc) as total_transactions_rank
    from brand_transaction_fact
""")

┌─────────────────────────┬───────────────┬──────────────────┬────────────────────┬─────────────────────────┐
│       brand_code        │  total_spend  │ total_spend_rank │ total_transactions │ total_transactions_rank │
│         varchar         │ decimal(38,2) │      int64       │       int64        │          int64          │
├─────────────────────────┼───────────────┼──────────────────┼────────────────────┼─────────────────────────┤
│ NO_BRAND_PROVIDED       │      21294.58 │                1 │                842 │                       1 │
│ BEN AND JERRYS          │       1217.40 │                2 │                 17 │                       2 │
│ MISSION                 │         46.27 │               57 │                 17 │                       2 │
│ PEPSI                   │        250.39 │               14 │                 16 │                       3 │
│ KLEENEX                 │        356.07 │                8 │                 15 │                       4 │
│ FOLGERS 

In [19]:
###################### DATA QUALITY ######################

In [20]:
duckdb.sql("""
with barcode_exist as (
    select
    receipts_df.barcode as receipts_barcode
    ,case when receipts_df.brandCode = '' then 'NO_BRAND_PROVIDED' else receipts_df.brandCode end as receipts_brand_code
    ,brands_df.barcode as brands_barcode
    ,brands_df.brandCode as brands_brand_code
    ,case when receipts_brand_code = brands_brand_code then true else false end as is_brand_code_match
    from receipts_df
    left join brands_df on cast(brands_df.barcode as varchar) = cast(receipts_df.barcode as varchar)
)

select 
count(*) as total_items
,count(case when brands_barcode is not null then brands_barcode end) as items_with_matched_barcode 
,count(case when brands_barcode is not null then brands_barcode end) / (1.0 * count(*)) as barcodes_covered
,count(case when is_brand_code_match = true then brands_barcode end) / (1.0 * count(case when brands_barcode is not null then brands_barcode end)) as pct_brands_matched
from barcode_exist
""").to_df()
# in some case we may want to enrich the receipts with further brand information...
# issues - 
# - only ~1% of all barcodes are correctly collected from data source OR the barcode is of a different format / structure
# - of those barcodes that did match ~90% have the same brand name
# in order for brands to be a viable table for enrichment the collection / quality of barcodes values must be improved

Unnamed: 0,total_items,items_with_matched_barcode,barcodes_covered,pct_brands_matched
0,7388,89,0.012047,0.898876


In [21]:
duckdb.sql('select *, count(*) from users_df group by 1,2,3,4,5,6,7').to_df()
# users.json is not deduped
# if this was a table that collected timestamps per user log in - would be valid

Unnamed: 0,_id,active,createdDate,lastLogin,role,signUpSource,state,count_star()
0,5ff1e194b6a9d73a3a9f1052,True,2021-01-03 10:24:04.800,2021-01-03 10:25:37.858000,consumer,Email,WI,11
1,5ff36c8862fde912123a538a,True,2021-01-04 14:29:12.943,2021-01-04 14:29:13.141000,consumer,Email,WI,1
2,5ff4ce91c1e2d0121a9b3057,True,2021-01-05 15:39:45.282,2021-01-05 15:39:45.329000,consumer,Email,WI,1
3,5ff4ce34c3d63511e2a484ba,True,2021-01-05 15:38:12.503,2021-01-05 15:38:12.565000,consumer,Email,WI,1
4,5ff4ce3ac1e2d0121a9b2fb3,True,2021-01-05 15:38:18.446,2021-01-05 15:38:18.495000,consumer,Email,WI,1
...,...,...,...,...,...,...,...,...
207,60229990b57b8a12187fe9e0,True,2021-02-09 09:17:52.581,2021-02-09 09:17:52.626000,consumer,Email,WI,2
208,6024399defa60112282c0ac9,True,2021-02-10 14:53:01.857,2021-02-10 14:53:02.079000,consumer,Email,WI,1
209,5f2068904928021530f8fc34,True,2020-07-28 14:04:00.905,2021-02-04 10:30:05.375000,fetch-staff,Email,WI,1
210,5fb0a078be5fc9775c1f3945,True,2020-11-14 22:28:56.818,,consumer,Google,AL,2


In [22]:
duckdb.sql('select brandCode, count(*) from brands_df group by 1 order by 2 desc').to_df()
# brand is not captured or not captured correctly

Unnamed: 0,brandCode,count_star()
0,,234
1,,35
2,GOODNITES,2
3,HUGGIES,2
4,STARBUCKS,1
...,...,...
893,HOFFMAN'S,1
894,BETTY CROCKER FRUIT BY THE FOOT,1
895,KOTEX,1
896,BAGEL BITES,1
