# What's happening here?

The sample Fetch Rewards data is in 3 files in the JSON Lines format. 

The JSON Lines cannot be converted to a relational table right away as there is a nesting of upto 3 levels and also normalization issues that lead to redundancies.  

In this notebook, I have introduced a relational database design and implemented it one schema at a time resulting in 9 tables in the end.  

Finally, I have imported the tables into a MySQL server. I could not add PK and FK constraints because the data is plagued with quality issues that I have addressed in a different file.

## Import Modules

Just a couple of file handling modules, the usual toolkit and SQLAlchemy for pushing data into the database server.

In [1]:
import os
import gzip
import shutil

import pandas as pd
import numpy as np
import json

from datetime import datetime
import swifter

from sqlalchemy import create_engine
import pymysql

# Section 1: Create Relational Structure

## 1.1 Unzip And Load Data Files

Since the JSON files were compressed in a gzip, I will unzip them first.

In [2]:
for i in os.listdir('Data'):
    if 'gz' in i:
        path = f'Data/{i}'
        with gzip.open(path, 'rb') as f_in:
              with open(path.replace('.gz',''), 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)

In [3]:
receipts_raw = pd.read_json('Data/receipts.json',lines=True)
brands_raw = pd.read_json('Data/brands.json',lines=True)
users_raw = pd.read_json('Data/users.json',lines=True)

## 1.2 Utility Functions

All dates are in UTC timestamps down to milliseconds. Since we don't need that level of minutenesss in time, I think I can safely drop the millisecond parts and convert them to Python datetime objects. The function 'timestamp2datetime' helps with that.

In [4]:
def timestamp2datetime(x):
    try:
        return (datetime.utcfromtimestamp(x // 1000).strftime('%Y-%m-%d %H:%M:%S'))
    except:
        return None

You can ignore this one really, it helps me find unique attributes of a collection JSON lines at the highest level of nesting.

In [5]:
def getAllProperties(x):
    x_nonnull = x[~x.isna()]
    return set().union(*(d.keys() for d in x_nonnull))

## 1.3 Create Users Schema

The first schema I create is Users. **These are our customers at Fetch, who follow the primary journey of scanning receipts for their transactions at retail stores.**

In [6]:
users_raw.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 495 entries, 0 to 494
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   _id           495 non-null    object
 1   active        495 non-null    bool  
 2   createdDate   495 non-null    object
 3   lastLogin     433 non-null    object
 4   role          495 non-null    object
 5   signUpSource  447 non-null    object
 6   state         439 non-null    object
dtypes: bool(1), object(6)
memory usage: 23.8+ KB


In [7]:
users_raw.head()

Unnamed: 0,_id,active,createdDate,lastLogin,role,signUpSource,state
0,{'$oid': '5ff1e194b6a9d73a3a9f1052'},True,{'$date': 1609687444800},{'$date': 1609687537858},consumer,Email,WI
1,{'$oid': '5ff1e194b6a9d73a3a9f1052'},True,{'$date': 1609687444800},{'$date': 1609687537858},consumer,Email,WI
2,{'$oid': '5ff1e194b6a9d73a3a9f1052'},True,{'$date': 1609687444800},{'$date': 1609687537858},consumer,Email,WI
3,{'$oid': '5ff1e1eacfcf6c399c274ae6'},True,{'$date': 1609687530554},{'$date': 1609687530597},consumer,Email,WI
4,{'$oid': '5ff1e194b6a9d73a3a9f1052'},True,{'$date': 1609687444800},{'$date': 1609687537858},consumer,Email,WI


In [8]:
print("Properties of user Id: ", getAllProperties(users_raw['_id']))
print("Properties of user createdDate: ", getAllProperties(users_raw['createdDate']))
print("Properties of user lastLogin: ", getAllProperties(users_raw['lastLogin']))

Properties of user Id:  {'$oid'}
Properties of user createdDate:  {'$date'}
Properties of user lastLogin:  {'$date'}


In [9]:
users = users_raw.copy()

users['id'] = users['_id'].swifter.apply(lambda x: x['$oid'] if x else np.nan)

users['active'] = users['active'].astype(bool).astype('Int32').fillna(0)

users['createdDate'] = users['createdDate'].swifter.apply(lambda x: timestamp2datetime(x['$date']) if type(x) is dict else np.nan)
users['lastLogin'] = users['lastLogin'].swifter.apply(lambda x: timestamp2datetime(x['$date']) if type(x) is dict else np.nan)

users.drop('_id', axis=1, inplace=True)

Pandas Apply:   0%|          | 0/495 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/495 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/495 [00:00<?, ?it/s]

In [10]:
users.head()

Unnamed: 0,active,createdDate,lastLogin,role,signUpSource,state,id
0,1,2021-01-03 15:24:04,2021-01-03 15:25:37,consumer,Email,WI,5ff1e194b6a9d73a3a9f1052
1,1,2021-01-03 15:24:04,2021-01-03 15:25:37,consumer,Email,WI,5ff1e194b6a9d73a3a9f1052
2,1,2021-01-03 15:24:04,2021-01-03 15:25:37,consumer,Email,WI,5ff1e194b6a9d73a3a9f1052
3,1,2021-01-03 15:25:30,2021-01-03 15:25:30,consumer,Email,WI,5ff1e1eacfcf6c399c274ae6
4,1,2021-01-03 15:24:04,2021-01-03 15:25:37,consumer,Email,WI,5ff1e194b6a9d73a3a9f1052


This is the final schema for **Users**.

## 1.4 Create Items, Category and Brands Schemas

Next, I take on brands and their items. **These are our partners who enroll with us to help their Shoppers become Savers**. Every brand is identified by a brandCode.  

Strangely, I noticed barCode as a column which is actually a identifier for items.  

In [11]:
brands_raw.head()

Unnamed: 0,_id,barcode,category,categoryCode,cpg,name,topBrand,brandCode
0,{'$oid': '601ac115be37ce2ead437551'},511111019862,Baking,BAKING,"{'$id': {'$oid': '601ac114be37ce2ead437550'}, ...",test brand @1612366101024,0.0,
1,{'$oid': '601c5460be37ce2ead43755f'},511111519928,Beverages,BEVERAGES,"{'$id': {'$oid': '5332f5fbe4b03c9a25efd0ba'}, ...",Starbucks,0.0,STARBUCKS
2,{'$oid': '601ac142be37ce2ead43755d'},511111819905,Baking,BAKING,"{'$id': {'$oid': '601ac142be37ce2ead437559'}, ...",test brand @1612366146176,0.0,TEST BRANDCODE @1612366146176
3,{'$oid': '601ac142be37ce2ead43755a'},511111519874,Baking,BAKING,"{'$id': {'$oid': '601ac142be37ce2ead437559'}, ...",test brand @1612366146051,0.0,TEST BRANDCODE @1612366146051
4,{'$oid': '601ac142be37ce2ead43755e'},511111319917,Candy & Sweets,CANDY_AND_SWEETS,"{'$id': {'$oid': '5332fa12e4b03c9a25efd1e7'}, ...",test brand @1612366146827,0.0,TEST BRANDCODE @1612366146827


In [12]:
print("Properties of brand Id: ", getAllProperties(brands_raw['_id']))
print("Properties of brand cpg: ", getAllProperties(brands_raw['cpg']))

Properties of brand Id:  {'$oid'}
Properties of brand cpg:  {'$id', '$ref'}


In [13]:
brands = brands_raw.copy()

brands['id'] = brands['_id'].swifter.apply(lambda x: x['$oid'] if x else np.nan)

brands['topBrand'] = brands['topBrand'].astype(bool).astype('Int32').fillna(0)

brands['cpg_id'] = brands['cpg'].swifter.apply(lambda x: x['$id'].get('$oid') if type(x) is dict else np.nan)
brands['cpg_ref'] = brands['cpg'].swifter.apply(lambda x: x['$ref'] if type(x) is dict else np.nan)

Pandas Apply:   0%|          | 0/1167 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/1167 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/1167 [00:00<?, ?it/s]

In [14]:
items = brands.loc[:,['barcode','categoryCode','brandCode','cpg_id','cpg_ref']]
items.head()

Unnamed: 0,barcode,categoryCode,brandCode,cpg_id,cpg_ref
0,511111019862,BAKING,,601ac114be37ce2ead437550,Cogs
1,511111519928,BEVERAGES,STARBUCKS,5332f5fbe4b03c9a25efd0ba,Cogs
2,511111819905,BAKING,TEST BRANDCODE @1612366146176,601ac142be37ce2ead437559,Cogs
3,511111519874,BAKING,TEST BRANDCODE @1612366146051,601ac142be37ce2ead437559,Cogs
4,511111319917,CANDY_AND_SWEETS,TEST BRANDCODE @1612366146827,5332fa12e4b03c9a25efd1e7,Cogs


In [15]:
categories = brands.loc[:,['categoryCode','category']]
categories.head()

Unnamed: 0,categoryCode,category
0,BAKING,Baking
1,BEVERAGES,Beverages
2,BAKING,Baking
3,BAKING,Baking
4,CANDY_AND_SWEETS,Candy & Sweets


In [16]:
brands.drop(['_id','cpg','barcode','category','categoryCode','cpg_id','cpg_ref'], axis=1, inplace=True)
brands.head()

Unnamed: 0,name,topBrand,brandCode,id
0,test brand @1612366101024,0,,601ac115be37ce2ead437551
1,Starbucks,0,STARBUCKS,601c5460be37ce2ead43755f
2,test brand @1612366146176,0,TEST BRANDCODE @1612366146176,601ac142be37ce2ead43755d
3,test brand @1612366146051,0,TEST BRANDCODE @1612366146051,601ac142be37ce2ead43755a
4,test brand @1612366146827,0,TEST BRANDCODE @1612366146827,601ac142be37ce2ead43755e


**I have separated brands, items and a roll-up of items called categories into separate tables** all in third normal form.

## 1.5 Create Transactions Schema

Next, I create the Transactions schema. **It is intentionally separated from Receipts because they are different entities of varying importance for Fetch.**

In [17]:
transactions = receipts_raw.loc[:, ['_id','purchaseDate','userId','totalSpent']]
transactions.reset_index(inplace=True)
transactions.rename(columns = {'index':'id','_id':'receiptId','purchaseDate':'transactionDate'}, inplace=True)

transactions['receiptId'] = transactions['receiptId'].swifter.apply(lambda x: x['$oid'] if x else np.nan)

transactions['transactionDate'] = transactions['transactionDate'].swifter.apply(lambda x: timestamp2datetime(x['$date']) if type(x) is dict else np.nan)

transactions.drop_duplicates(inplace=True)
transactions.head()

Pandas Apply:   0%|          | 0/1119 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/1119 [00:00<?, ?it/s]

Unnamed: 0,id,receiptId,transactionDate,userId,totalSpent
0,0,5ff1e1eb0a720f0523000575,2021-01-03 00:00:00,5ff1e1eacfcf6c399c274ae6,26.0
1,1,5ff1e1bb0a720f052300056b,2021-01-02 15:24:43,5ff1e194b6a9d73a3a9f1052,11.0
2,2,5ff1e1f10a720f052300057a,2021-01-03 00:00:00,5ff1e1f1cfcf6c399c274b0b,10.0
3,3,5ff1e1ee0a7214ada100056f,2021-01-03 00:00:00,5ff1e1eacfcf6c399c274ae6,28.0
4,4,5ff1e1d20a7214ada1000561,2021-01-02 15:25:06,5ff1e194b6a9d73a3a9f1052,1.0


The Transactions schema is ready. However, a single transaction could have multiple items and we need to account for that.

## 1.6 Create TransactionItems Schema

In [18]:
receipt2transaction = dict(zip(transactions.receiptId, transactions.id))

In [19]:
receiptLineItems_raw = receipts_raw[['_id','rewardsReceiptItemList']].explode('rewardsReceiptItemList').reset_index(drop=True)
receiptLineItems_raw['rewardsReceiptItemList'] = receiptLineItems_raw['rewardsReceiptItemList'].swifter.apply(lambda x: dict() if x is np.nan else x)

receiptLineItems = pd.DataFrame.from_dict(list(receiptLineItems_raw.rewardsReceiptItemList))

receiptLineItems['receiptId'] = receiptLineItems_raw['_id'].swifter.apply(lambda x: x['$oid'] if x else np.nan)

receiptLineItems['transactionId'] = receiptLineItems['receiptId'].swifter.apply(lambda x: receipt2transaction.get(x))

for col in ['needsFetchReview','userFlaggedNewItem','preventTargetGapPoints','deleted','competitiveProduct']:
    receiptLineItems[col] = receiptLineItems[col].astype(bool).astype('Int32').fillna(0)

Pandas Apply:   0%|          | 0/7381 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/7381 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/7381 [00:00<?, ?it/s]

In [20]:
transactionLineItems = receiptLineItems.loc[:,['transactionId','barcode','deleted','finalPrice','itemPrice','priceAfterCoupon','discountedItemPrice','originalFinalPrice','quantityPurchased',
                        'originalMetaBriteItemPrice','partnerItemId','metabriteCampaignId',
                        'userFlaggedBarcode','userFlaggedNewItem','userFlaggedPrice','userFlaggedQuantity',
                        'needsFetchReview','needsFetchReviewReason','originalMetaBriteBarcode','originalMetaBriteQuantityPurchased']]

transactionLineItems = transactionLineItems.reset_index().rename(columns={'index':'id'})

transactionLineItems.head()

Unnamed: 0,id,transactionId,barcode,deleted,finalPrice,itemPrice,priceAfterCoupon,discountedItemPrice,originalFinalPrice,quantityPurchased,...,partnerItemId,metabriteCampaignId,userFlaggedBarcode,userFlaggedNewItem,userFlaggedPrice,userFlaggedQuantity,needsFetchReview,needsFetchReviewReason,originalMetaBriteBarcode,originalMetaBriteQuantityPurchased
0,0,0,4011.0,1,26.0,26.0,,,,5.0,...,1,,4011.0,1,26.0,5.0,0,,,
1,1,1,4011.0,1,1.0,1.0,,,,1.0,...,1,,,1,,,1,,,
2,2,1,28400642255.0,1,10.0,10.0,,,,1.0,...,2,,28400642255.0,1,10.0,1.0,1,USER_FLAGGED,,
3,3,2,,1,,,,,,,...,1,,4011.0,1,26.0,3.0,0,,,
4,4,3,4011.0,1,28.0,28.0,,,,4.0,...,1,,4011.0,1,28.0,4.0,0,,,


Now every transaction item is a separate record, is linked to its parent transaction and has important properties related to price, discounts, partner and campaign.  

I also noticed that some items in receipts were missing in my Items schema, let us add those. I will exclude those items from receipts which do not have a brandCode.

In [21]:
items = pd.concat(
    [
        items, 
        receiptLineItems.loc[~receiptLineItems.brandCode.isna(), ['barcode','brandCode']]
    ], 
    ignore_index=True)

items.drop_duplicates('barcode',inplace=True)

## 1.7 Create Receipts Schema

Receipts are the backbone at Fetch. **When a customer scans, an event is created which triggers the processing pipeline that evaluates rewards points.** A receipt is linked to an actual purchase aka transaction.

In [22]:
receipts = receipts_raw.loc[:,['_id','createDate','dateScanned','finishedDate','modifyDate','userId','pointsEarned']]

receipts['id'] = receipts['_id'].swifter.apply(lambda x: x['$oid'] if x else np.nan)

receipts['createDate'] = receipts['createDate'].swifter.apply(lambda x: timestamp2datetime(x['$date']) if type(x) is dict else np.nan)

receipts['dateScanned'] = receipts['dateScanned'].swifter.apply(lambda x: timestamp2datetime(x['$date']) if type(x) is dict else np.nan)

receipts['finishedDate'] = receipts['finishedDate'].swifter.apply(lambda x: timestamp2datetime(x['$date']) if type(x) is dict else np.nan)

receipts['modifyDate'] = receipts['modifyDate'].swifter.apply(lambda x: timestamp2datetime(x['$date']) if type(x) is dict else np.nan)

receipts['transactionId'] = receipts['id'].swifter.apply(lambda x: receipt2transaction.get(x))

receipts.drop_duplicates('id',inplace=True)
receipts.drop('_id',axis=1,inplace=True)

Pandas Apply:   0%|          | 0/1119 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/1119 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/1119 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/1119 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/1119 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/1119 [00:00<?, ?it/s]

In [23]:
receipts.head()

Unnamed: 0,createDate,dateScanned,finishedDate,modifyDate,userId,pointsEarned,id,transactionId
0,2021-01-03 15:25:31,2021-01-03 15:25:31,2021-01-03 15:25:31,2021-01-03 15:25:36,5ff1e1eacfcf6c399c274ae6,500.0,5ff1e1eb0a720f0523000575,0
1,2021-01-03 15:24:43,2021-01-03 15:24:43,2021-01-03 15:24:43,2021-01-03 15:24:48,5ff1e194b6a9d73a3a9f1052,150.0,5ff1e1bb0a720f052300056b,1
2,2021-01-03 15:25:37,2021-01-03 15:25:37,,2021-01-03 15:25:42,5ff1e1f1cfcf6c399c274b0b,5.0,5ff1e1f10a720f052300057a,2
3,2021-01-03 15:25:34,2021-01-03 15:25:34,2021-01-03 15:25:34,2021-01-03 15:25:39,5ff1e1eacfcf6c399c274ae6,5.0,5ff1e1ee0a7214ada100056f,3
4,2021-01-03 15:25:06,2021-01-03 15:25:06,2021-01-03 15:25:11,2021-01-03 15:25:11,5ff1e194b6a9d73a3a9f1052,5.0,5ff1e1d20a7214ada1000561,4


With the Receipts schema ready, we can tackle the final piece in our design - **Rewards**.

## 1.8 Create Rewards Schema

Rewards are what drives the customer into a feedback loop of scanning and saving. They are an entity of their own.

In [24]:
rewards = receipts_raw.loc[:,['_id','bonusPointsEarned','bonusPointsEarnedReason','pointsAwardedDate','pointsEarned',
                            'rewardsReceiptStatus']]

rewards['receiptId'] = rewards['_id'].swifter.apply(lambda x: x['$oid'] if x else np.nan)

rewards['pointsAwardedDate'] = rewards['pointsAwardedDate'].swifter.apply(lambda x: timestamp2datetime(x['$date']) if type(x) is dict else np.nan)

rewards.drop_duplicates('receiptId',inplace=True)
rewards.drop('_id',axis=1,inplace=True)

rewards = rewards.reset_index().rename(columns={'index':'id'})

rewards.head()

Pandas Apply:   0%|          | 0/1119 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/1119 [00:00<?, ?it/s]

Unnamed: 0,id,bonusPointsEarned,bonusPointsEarnedReason,pointsAwardedDate,pointsEarned,rewardsReceiptStatus,receiptId
0,0,500.0,"Receipt number 2 completed, bonus point schedu...",2021-01-03 15:25:31,500.0,FINISHED,5ff1e1eb0a720f0523000575
1,1,150.0,"Receipt number 5 completed, bonus point schedu...",2021-01-03 15:24:43,150.0,FINISHED,5ff1e1bb0a720f052300056b
2,2,5.0,All-receipts receipt bonus,,5.0,REJECTED,5ff1e1f10a720f052300057a
3,3,5.0,All-receipts receipt bonus,2021-01-03 15:25:34,5.0,FINISHED,5ff1e1ee0a7214ada100056f
4,4,5.0,All-receipts receipt bonus,2021-01-03 15:25:06,5.0,FINISHED,5ff1e1d20a7214ada1000561


Next, I understand that rewards are processed at a granular level of items - some items are eligible and some are not.

## Create RewardReceiptItems Schema

Our final schema records the rewards earned for every item on the receipt that was purchased.

In [25]:
rewardReceiptItems = receiptLineItems.loc[:,['receiptId','originalReceiptItemText','itemNumber','pointsNotAwardedReason',
                                                  'pointsPayerId','rewardsGroup','rewardsProductPartnerId',
                                                  'competitorRewardsGroup','pointsEarned']]

rewardReceiptItems = rewardReceiptItems.reset_index().reset_index()

rewardReceiptItems.rename(columns={'level_0':'id','index':'transactionItemId'}, inplace=True)

rewardReceiptItems.head()

Unnamed: 0,id,transactionItemId,receiptId,originalReceiptItemText,itemNumber,pointsNotAwardedReason,pointsPayerId,rewardsGroup,rewardsProductPartnerId,competitorRewardsGroup,pointsEarned
0,0,0,5ff1e1eb0a720f0523000575,,,,,,,,
1,1,1,5ff1e1bb0a720f052300056b,,,,,,,,
2,2,2,5ff1e1bb0a720f052300056b,,,Action not allowed for user and CPG,5332f5fbe4b03c9a25efd0ba,DORITOS SPICY SWEET CHILI SINGLE SERVE,5332f5fbe4b03c9a25efd0ba,,
3,3,3,5ff1e1f10a720f052300057a,,,,,,,,
4,4,4,5ff1e1ee0a7214ada100056f,,,,,,,,


# Section 2: Import Schema Into MySQL Server

With the schema ready to be deployed, I will connect to my local MySQL server to push the dataframes into tables. This would help me run queries for identifying data quality issues and other analytics.

## 2.1 Create Database Connection

In [26]:
config = json.load(open('config.json'))

user, password, port, host, db = config.values()

sqlEngine       = create_engine(f'mysql+pymysql://{user}:{password}@{host}:{port}/{db}')
dbConnection    = sqlEngine.connect()

## 2.2 Write DataFrames into MySQL Tables

In [None]:
users.to_sql('users', dbConnection, if_exists='fail')
brands.to_sql('brands', dbConnection, if_exists='fail')
items.to_sql('items', dbConnection, if_exists='fail')
categories.to_sql('categories', dbConnection, if_exists='fail')
transactions.to_sql('transactions', dbConnection, if_exists='fail')
transactionLineItems.to_sql('transactionItems', dbConnection, if_exists='fail')
receipts.to_sql('receipts', dbConnection, if_exists='fail')
rewards.to_sql('rewards', dbConnection, if_exists='fail')
rewardReceiptItems.to_sql('rewardReceiptItems', dbConnection, if_exists='fail')

### Done!

Data is ready, switch to the other files to see the queries I hit against this.