In [9]:
import numpy as np, pandas as pd, os, sqlite3, json

In [10]:
# Function to create SQLite database and tables
def create_database_and_tables():
    # Connect to SQLite database (or create if it doesn't exist)
    conn = sqlite3.connect('datalake.db')
    cursor = conn.cursor()

    # Create tables
    # Create users table
    cursor.execute('''CREATE TABLE IF NOT EXISTS users (
                    id TEXT,
                    active INTEGER,
                    createdDate INTEGER,
                    lastLogin INTEGER,
                    role TEXT,
                    signUpSource TEXT,
                    state TEXT
                )''')

    # Create receipts table
    cursor.execute('''CREATE TABLE IF NOT EXISTS receipts (
                    id TEXT PRIMARY KEY,
                    bonusPointsEarned INTEGER,
                    bonusPointsEarnedReason TEXT,
                    createDate INTEGER,
                    dateScanned INTEGER,
                    finishedDate INTEGER,
                    modifyDate INTEGER,
                    pointsAwardedDate INTEGER,
                    pointsEarned REAL,
                    purchaseDate INTEGER,
                    purchasedItemCount INTEGER,
                    rewardsReceiptItemList TEXT, 
                    rewardsReceiptStatus TEXT,
                    totalSpent REAL,
                    userId TEXT
                )''')

# Create brands table
    cursor.execute('''CREATE TABLE IF NOT EXISTS brands (
                    id TEXT PRIMARY KEY,
                    name TEXT,
                    cpg_ref TEXT,
                    cpg_id TEXT,
                    category TEXT,
                    categoryCode TEXT,
                    barcode TEXT,
                    brandCode TEXT,
                    topBrand INTEGER
                )''')

    conn.commit()
    conn.close()



In [11]:
create_database_and_tables()

In [12]:
# Fetching the table 
conn = sqlite3.connect('datalake.db')
cursor = conn.cursor()
res = cursor.execute("select name from sqlite_master")
res.fetchall()

[('users',),
 ('receipts',),
 ('sqlite_autoindex_receipts_1',),
 ('brands',),
 ('sqlite_autoindex_brands_1',)]

In [13]:
# Read and insert data from file 1 (users)
with open('users.json') as f:
    for line in f:
        user_data = json.loads(line)
        user_id = user_data.get('_id', {}).get('$oid', None)
        active = user_data.get('active', None)
        created_date = user_data.get('createdDate', {}).get('$date', None)
        last_login = user_data.get('lastLogin', {}).get('$date', None)
        role = user_data.get('role', None)
        signUpSource = user_data.get('signUpSource', None)
        state = user_data.get('state', None)

        cursor.execute('''INSERT INTO users VALUES (?, ?, ?, ?, ?, ?, ?)''',
                       (user_id, active, created_date, last_login, role, signUpSource, state))

# Read and insert data from file 2 (receipts)
with open('receipts.json') as f:
    for line in f:
        receipt_data = json.loads(line)
        receipt_id = receipt_data.get('_id', {}).get('$oid', None)
        bonus_points_earned = receipt_data.get('bonusPointsEarned', None)
        bonus_points_earned_reason = receipt_data.get('bonusPointsEarnedReason', None)
        create_date = receipt_data.get('createDate', {}).get('$date', None)
        date_scanned = receipt_data.get('dateScanned', {}).get('$date', None)
        finished_date = receipt_data.get('finishedDate', {}).get('$date', None)
        modify_date = receipt_data.get('modifyDate', {}).get('$date', None)
        points_awarded_date = receipt_data.get('pointsAwardedDate', {}).get('$date', None)
        points_earned = receipt_data.get('pointsEarned', None)
        purchase_date = receipt_data.get('purchaseDate', {}).get('$date', None)
        purchased_item_count = receipt_data.get('purchasedItemCount', None)
        rewards_receipt_itemlist = json.dumps(receipt_data.get('rewardsReceiptItemList', None))
        rewards_receipt_status = receipt_data.get('rewardsReceiptStatus', None)
        total_spent = receipt_data.get('totalSpent', None)
        user_id = receipt_data.get('userId', None)

        cursor.execute('''INSERT INTO receipts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
                       (receipt_id, bonus_points_earned, bonus_points_earned_reason, create_date, date_scanned,
                        finished_date, modify_date, points_awarded_date, points_earned, purchase_date,
                        purchased_item_count, rewards_receipt_itemlist, rewards_receipt_status, total_spent, user_id))

# Read and insert data from file 3 (brands)
with open('brands.json') as f:
    for line in f:
        brand_data = json.loads(line)
        brand_id = brand_data.get('_id', {}).get('$oid', None)
        name = brand_data.get('name', None)
        cpg_ref = brand_data.get('cpg', {}).get('$ref', None)
        cpg_id = brand_data.get('cpg', {}).get('$id', {}).get('$oid', None)
        category = brand_data.get('category', None)
        category_code = brand_data.get('categoryCode', None)
        barcode = brand_data.get('barcode', None)
        brand_code = brand_data.get('brandCode', None)
        top_brand = int(brand_data.get('topBrand', False))

        cursor.execute('''INSERT INTO brands VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''',
                       (brand_id, name, cpg_ref, cpg_id, category, category_code, barcode, brand_code, top_brand))

# Commit changes and close connection
conn.commit()
conn.close()

In [14]:
conn = sqlite3.connect('datalake.db')
cursor = conn.cursor()
res = cursor.execute("select count(id), count(distinct id) from users limit 10")
res.fetchall()

[(495, 212)]

In [15]:
res = cursor.execute("select count(id), count(distinct id) from receipts")
res.fetchall()

[(1119, 1119)]

In [16]:
res = cursor.execute("select count(id), count(distinct id) from brands")
res.fetchall()

[(1167, 1167)]

In [17]:
res = cursor.execute('''select count(id) as cnt, count(distinct id) as users from users
                     where id in (select distinct userId from receipts)
                     ''')
res.fetchall()

[(418, 141)]

In [18]:
res = cursor.execute('''select count(userId), count(distinct userId) from receipts where userId not in 
                     (select distinct id from users)''')
res.fetchall()

[(148, 117)]

In [19]:
res = cursor.execute('''select * from receipts limit 10''')
res.fetchall()

[('5ff1e1eb0a720f0523000575',
  500,
  'Receipt number 2 completed, bonus point schedule DEFAULT (5cefdcacf3693e0b50e83a36)',
  1609687531000,
  1609687531000,
  1609687531000,
  1609687536000,
  1609687531000,
  500.0,
  1609632000000,
  5,
  '[{"barcode": "4011", "description": "ITEM NOT FOUND", "finalPrice": "26.00", "itemPrice": "26.00", "needsFetchReview": false, "partnerItemId": "1", "preventTargetGapPoints": true, "quantityPurchased": 5, "userFlaggedBarcode": "4011", "userFlaggedNewItem": true, "userFlaggedPrice": "26.00", "userFlaggedQuantity": 5}]',
  'FINISHED',
  26.0,
  '5ff1e1eacfcf6c399c274ae6'),
 ('5ff1e1bb0a720f052300056b',
  150,
  'Receipt number 5 completed, bonus point schedule DEFAULT (5cefdcacf3693e0b50e83a36)',
  1609687483000,
  1609687483000,
  1609687483000,
  1609687488000,
  1609687483000,
  150.0,
  1609601083000,
  2,
  '[{"barcode": "4011", "description": "ITEM NOT FOUND", "finalPrice": "1", "itemPrice": "1", "partnerItemId": "1", "quantityPurchased": 1},