In [None]:
print(r""""___________     __         .__      __________                                .___       ___________       __              ___ ___                        
\_   _____/____/  |_  ____ |  |__   \______   \ ______  _  _______ _______  __| _/______ \__    ___/____  |  | __ ____    /   |   \  ____   _____   ____  
 |    __)/ __ \   __\/ ___\|  |  \   |       _// __ \ \/ \/ /\__  \\_  __ \/ __ |/  ___/   |    |  \__  \ |  |/ // __ \  /    ~    \/  _ \ /     \_/ __ \ 
 |     \\  ___/|  | \  \___|   Y  \  |    |   \  ___/\     /  / __ \|  | \/ /_/ |\___ \    |    |   / __ \|    <\  ___/  \    Y    (  <_> )  Y Y  \  ___/ 
 \___  / \___  >__|  \___  >___|  /  |____|_  /\___  >\/\_/  (____  /__|  \____ /____  >   |____|  (____  /__|_ \\___  >  \___|_  / \____/|__|_|  /\___  >
     \/      \/          \/     \/          \/     \/             \/           \/    \/                 \/     \/    \/         \/              \/     \/ """)
     

In [None]:
#Required Packages
%pip install pandas

In [None]:
import gzip
import os
import json

gz_folder = 'source_files'
unzipped_data = {}

# Loop through all .gz files in the folder and unzip them into a dictionary that holds each respective file 
for filename in os.listdir(gz_folder):
    #Constructing the path to open the file
    gz_path = os.path.join(gz_folder, filename)
    #Opening the file with rt to convert automatically to text 
    with gzip.open(gz_path, 'rt') as file:
        base_name = filename[:-3]
        unzipped_data[base_name] = file.read()


print("Files extracted:", list(unzipped_data.keys()))


In [None]:
print("Looking at Unzipped Data: ", unzipped_data['brands.json'])

In [None]:
# This is a new dicitionary with JSON parsed as values and filenames as keys
pretty_printed_data = {}
for filename, file_content in unzipped_data.items():
    pretty_objects = []
    lines = file_content.strip().split('\n')
    
    for i, line in enumerate(lines):
        try:
            json_obj = json.loads(line)
            pretty_str = json.dumps(json_obj, indent=4)
            pretty_objects.append(pretty_str)
        except json.JSONDecodeError as e:
            print(f"Skipping line {i+1} in {filename}: {e}")

In [None]:
# Got some errors in the users.json file. Could be whitespace issues checking to see
users_lines = unzipped_data['users.json'].split('\n')

line_numbers_to_check = [0, 495]  # line 1 and line 496

for i in line_numbers_to_check:
    print(f"Line {i+1}: {repr(users_lines[i])}")
# The output suggests this is a TAR file probably someone used linux or mac to make it. Next cell down will redo it to account for this

In [71]:
import tarfile
import pandas
gz_folder = 'source_files'
unzipped_data = {}

# Loop through all .gz files in the folder and unzip them into a dictionary that holds each respective file 
for filename in os.listdir(gz_folder):
    #Constructing the path to open the file
    gz_path = os.path.join(gz_folder, filename)
    #Opening the file with rt to convert automatically to text 
    try:
        with tarfile.open(gz_path, 'r:gz') as tar:
            for member in tar.getmembers():
                if member.isfile():
                    extracted = tar.extractfile(member)
                    if extracted:
                        content = extracted.read().decode('utf-8')
                        lines = content.strip().split('\n')
                        pretty_lines = []
                        for line in lines:
                            if not line.strip():
                                continue
                            try:
                                json_obj = json.loads(line)
                                pretty_lines.append(json.dumps(json_obj, indent=4))
                            except json.JSONDecodeError as e:
                                print(f"Skipping line in {member.name}: {e}")
                        unzipped_data[member.name] = '\n\n'.join(pretty_lines)
    except tarfile.ReadError:
        with gzip.open(gz_path, 'rt') as file:
            base_name = filename[:-3]
# In case TAR unzip doesn't work
            unzipped_data[base_name] = file.read()

print("Files extracted:", list(unzipped_data.keys()))
# print(unzipped_data['brands.json'])
# print(unzipped_data['users.json'])
# print(unzipped_data['receipts.json'])



Files extracted: ['brands.json', 'receipts.json', 'users.json']


In [73]:

cleaned_dataframes = {}

# Explicitly define which files use NDJSON structure
ndjson_files = {'receipts.json', 'brands.json'}
pretty_json_files = {'users.json'}

for filename, content in unzipped_data.items():
    try:
        stripped = content.strip()

        if filename in ndjson_files:
            # NDJSON: each line is a standalone JSON object
            json_objects = [
                json.loads(line)
                for line in stripped.splitlines()
                if line.strip()
            ]
        elif filename in pretty_json_files:
            # Pretty-printed or compact JSON with blank-line separation
            blocks = stripped.split('\n\n')
            json_objects = [
                json.loads(block)
                for block in blocks
                if block.strip()
            ]
        else:
            raise ValueError(f"Unknown format for file: {filename}")

        df = pd.DataFrame(json_objects)
        cleaned_dataframes[filename] = df

    except Exception as e:
        print(f"Failed to load {filename} into DataFrame: {e}")
# print("Users json: ",cleaned_dataframes['users.json'].head())
# print("Brands json: ",cleaned_dataframes['brands.json'].head())
# print(cleaned_dataframes['brands.json']['cpg'].head())

# print("Receipts json: ",cleaned_dataframes['receipts.json'].head())
# Looks like I need to parse out the dates which were sent as nested objects and the ids

In [74]:
from datetime import datetime

#Cleaning up id objects and dates. 
for df in cleaned_dataframes.values():
    # 1. Normalize _id.$oid to a string
    if '_id' in df.columns:
        df['_id'] = df['_id'].apply(
            lambda x: x.get('$oid') if isinstance(x, dict) and '$oid' in x else x
        )

    # 2. Normalize all columns that include "date" (case-insensitive)
    for col in df.columns:
        if 'date' or 'lastlogin' in col.lower():
            df[col] = df[col].apply(
                lambda x: datetime.fromtimestamp(x['$date'] / 1000.0)
                if isinstance(x, dict) and '$date' in x else x
            )
print("Users json: ",cleaned_dataframes['users.json'].head())
print("Brands json: ",cleaned_dataframes['brands.json'].head())
print("Receipts json: ",cleaned_dataframes['receipts.json'].head())
# Looks like I can't just key on date adding lastLogin from Users as well. 

# is_unique = cleaned_dataframes['brands.json']['brandCode'].is_unique
# print("Is 'name' column unique in brands.json?:", is_unique)

Users json:                          _id  active             createdDate  \
0  5ff1e194b6a9d73a3a9f1052    True 2021-01-03 09:24:04.800   
1  5ff1e194b6a9d73a3a9f1052    True 2021-01-03 09:24:04.800   
2  5ff1e194b6a9d73a3a9f1052    True 2021-01-03 09:24:04.800   
3  5ff1e1eacfcf6c399c274ae6    True 2021-01-03 09:25:30.554   
4  5ff1e194b6a9d73a3a9f1052    True 2021-01-03 09:24:04.800   

                lastLogin      role signUpSource state  
0 2021-01-03 09:25:37.858  consumer        Email    WI  
1 2021-01-03 09:25:37.858  consumer        Email    WI  
2 2021-01-03 09:25:37.858  consumer        Email    WI  
3 2021-01-03 09:25:30.597  consumer        Email    WI  
4 2021-01-03 09:25:37.858  consumer        Email    WI  
Brands json:                          _id       barcode        category      categoryCode  \
0  601ac115be37ce2ead437551  511111019862          Baking            BAKING   
1  601c5460be37ce2ead43755f  511111519928       Beverages         BEVERAGES   
2  601ac142be37

In [75]:
# users.json
print("users.json columns:", list(cleaned_dataframes['users.json'].columns))
print(cleaned_dataframes['users.json'].head(2))

# brands.json
print("brands.json columns:", list(cleaned_dataframes['brands.json'].columns))
print(cleaned_dataframes['brands.json'].head(2))

# receipts.json
print("receipts.json columns:", list(cleaned_dataframes['receipts.json'].columns))
print(cleaned_dataframes['receipts.json'].head(2))



users.json columns: ['_id', 'active', 'createdDate', 'lastLogin', 'role', 'signUpSource', 'state']
                        _id  active             createdDate  \
0  5ff1e194b6a9d73a3a9f1052    True 2021-01-03 09:24:04.800   
1  5ff1e194b6a9d73a3a9f1052    True 2021-01-03 09:24:04.800   

                lastLogin      role signUpSource state  
0 2021-01-03 09:25:37.858  consumer        Email    WI  
1 2021-01-03 09:25:37.858  consumer        Email    WI  
brands.json columns: ['_id', 'barcode', 'category', 'categoryCode', 'cpg', 'name', 'topBrand', 'brandCode']
                        _id       barcode   category categoryCode  \
0  601ac115be37ce2ead437551  511111019862     Baking       BAKING   
1  601c5460be37ce2ead43755f  511111519928  Beverages    BEVERAGES   

                                                 cpg  \
0  {'$id': {'$oid': '601ac114be37ce2ead437550'}, ...   
1  {'$id': {'$oid': '5332f5fbe4b03c9a25efd0ba'}, ...   

                        name topBrand  brandCode  
0  t

In [76]:
# Creating a database and tables
import sqlite3
import pandas as pd
from datetime import datetime, timedelta

# Create in-memory SQLite DB
conn = sqlite3.connect(":memory:")
cur = conn.cursor()

# Create table
create_statements = [
    """
    CREATE TABLE dates (
        id INTEGER PRIMARY KEY AUTOINCREMENT,          -- surrogate key
        calendar_date DATE UNIQUE,
        year INTEGER,
        month INTEGER,
        day INTEGER,
        day_of_week INTEGER,
        week_of_year INTEGER,
        is_weekend BOOLEAN
    );
    """,
    """
    CREATE TABLE roles (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        role_name TEXT UNIQUE
    );
    """,
    """
    CREATE TABLE sign_up_sources (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        source_name TEXT UNIQUE
    );
    """,
    """
    CREATE TABLE states (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        state_code TEXT UNIQUE
    );
    """,
    """
    CREATE TABLE users (
        id TEXT PRIMARY KEY UNIQUE,
        active BOOLEAN,
        createdDate DATETIME,
        lastLogin DATETIME,
        created_date_id INTEGER,
        last_login_date_id INTEGER,
        role_id INTEGER,
        signup_source_id INTEGER,
        state_id INTEGER,
        FOREIGN KEY (created_date_id) REFERENCES dates(id),
        FOREIGN KEY (last_login_date_id) REFERENCES dates(id),
        FOREIGN KEY (role_id) REFERENCES roles(id),
        FOREIGN KEY (signup_source_id) REFERENCES sign_up_sources(id),
        FOREIGN KEY (state_id) REFERENCES states(id)
    );
    """,
    """
    CREATE TABLE categories (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        category TEXT,
        category_code TEXT,
        UNIQUE(category, category_code)
    );
    """,
    """
    CREATE TABLE cpgs (
        id TEXT PRIMARY KEY,
        ref TEXT UNIQUE
    );
    """,
    """
    CREATE TABLE brands (
        id TEXT PRIMARY KEY UNIQUE,
        name TEXT,
        barcode TEXT,
        top_brand BOOLEAN,
        brand_code TEXT,
        category_id INTEGER,
        cpg_id TEXT,
        FOREIGN KEY (category_id) REFERENCES categories(id),
        FOREIGN KEY (cpg_id) REFERENCES cpgs(id)
    );
    """,
    """
    CREATE TABLE receipt_statuses (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        status TEXT UNIQUE
    );
    """,
    """
    CREATE TABLE receipts (
        id TEXT PRIMARY KEY UNIQUE,
        user_id INTEGER,
        bonus_points_earned INTEGER,
        bonus_points_reason TEXT,
        create_date_id INTEGER,
        date_scanned_id INTEGER,
        finished_date_id INTEGER,
        modify_date_id INTEGER,
        points_awarded_date_id INTEGER,
        purchase_date_id INTEGER,
        points_earned REAL,
        purchased_item_count INTEGER,
        total_spent REAL,
        receipt_status_id INTEGER,
        FOREIGN KEY (user_id) REFERENCES users(id),
        FOREIGN KEY (create_date_id) REFERENCES dates(id),
        FOREIGN KEY (date_scanned_id) REFERENCES dates(id),
        FOREIGN KEY (finished_date_id) REFERENCES dates(id),
        FOREIGN KEY (modify_date_id) REFERENCES dates(id),
        FOREIGN KEY (points_awarded_date_id) REFERENCES dates(id),
        FOREIGN KEY (purchase_date_id) REFERENCES dates(id),
        FOREIGN KEY (receipt_status_id) REFERENCES receipt_statuses(id)
    );
    """,
    """
    CREATE TABLE receipt_items (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        receipt_id TEXT,
        barcode TEXT,
        description TEXT,
        final_price REAL,
        item_price REAL,
        quantity_purchased INTEGER,
        user_flagged_new_item BOOLEAN,
        user_flagged_barcode TEXT,
        user_flagged_price REAL,
        user_flagged_quantity INTEGER,
        needs_fetch_review BOOLEAN,
        partner_item_id TEXT,
        points_not_awarded_reason TEXT,
        points_payer_id TEXT,
        rewards_group TEXT,
        rewards_product_partner_id TEXT,
        user_flagged_description TEXT,
        FOREIGN KEY (receipt_id) REFERENCES receipts(id)
        FOREIGN KEY (points_payer_id) REFERENCES users(id)
        FOREIGN KEY (rewards_product_partner_id) REFERENCES users(id)
    );
    """]


#creating tables
for stmt in create_statements:
    cur.execute(stmt)


# Populate with 20 years of dates
start_date = datetime(2005, 1, 1)
end_date = datetime(2025, 7, 31)
delta = end_date - start_date

for i in range(delta.days + 1):
    date_val = start_date + timedelta(days=i)
    cur.execute("""
        INSERT INTO dates
              (calendar_date, year, month, day,
               day_of_week, week_of_year, is_weekend)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, (
        date_val.strftime('%Y-%m-%d'),
        date_val.year,
        date_val.month,
        date_val.day,
        date_val.weekday(),
        date_val.isocalendar()[1],
        int(date_val.weekday() >= 5)
    ))




In [77]:
# Main ETL script
import json, math
from datetime import datetime
import pandas as pd

#helper functions to handle nulls etc.
def safe_ts(ts):
    """Return ISO 'YYYY‑MM‑DD HH:MM:SS' or None."""
    if pd.isna(ts) or ts in ("", None):
        return None
    return pd.to_datetime(ts).strftime("%Y-%m-%d %H:%M:%S")

def safe_dt(ts):
    """Return a date object or None."""
    if pd.isna(ts) or ts in ("", None):
        return None
    return pd.to_datetime(ts).date()

def safe_int(val, default=None):
    """Convert to int, handling NaN / None / '' gracefully."""
    if val in ("", None) or (isinstance(val, float) and math.isnan(val)):
        return default
    try:
        return int(val)
    except (ValueError, TypeError):
        return default

def safe_float(val, default=None):
    """Return float or default (for NaN / None / '')."""
    if val in ("", None) or (isinstance(val, float) and math.isnan(val)):
        return default
    try:
        return float(val)
    except (ValueError, TypeError):
        return default
# CPGs  – extract $oid
def extract_cpg_id(val):
    if isinstance(val, str):
        try:
            val = json.loads(val)
        except Exception:
            return None
    if isinstance(val, dict):
        return val.get("$oid") or val.get("$id", {}).get("$oid")
    return None
def ensure_date(conn, d):
    """Insert d into dates dimension if not present."""
    cur = conn.cursor()
    cur.execute("SELECT 1 FROM dates WHERE calendar_date = ?", (d,))
    if cur.fetchone():
        return
    cur.execute(
        """
        INSERT INTO dates
              (calendar_date, year, month, day,
               day_of_week, week_of_year, is_weekend)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        """,
        (d.isoformat(), d.year, d.month, d.day,
         d.weekday(), d.isocalendar()[1], int(d.weekday() >= 5))
    )
    conn.commit()
def get_date_id(conn, d):
    """
    Ensure d exists in the dates dimension and return its surrogate id.
    Returns None if d is None.
    """
    if d is None:
        return None

    cur = conn.cursor()
    cur.execute("SELECT id FROM dates WHERE calendar_date = ?", (d.isoformat(),))
    row = cur.fetchone()
    if row:
        return row[0]

    cur.execute(
        """
        INSERT INTO dates
              (calendar_date, year, month, day,
               day_of_week, week_of_year, is_weekend)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        """,
        (d.isoformat(), d.year, d.month, d.day,
         d.weekday(), d.isocalendar()[1], int(d.weekday() >= 5))
    )
    return cur.lastrowid

def fk_id(series, key):
    """Look up surrogate key → int or None."""
    if key is None or key == "" or (isinstance(key, float) and math.isnan(key)):
        return None
    val = series.get(key)
    return int(val) if val is not None else None

#staging data frames
users_df    = cleaned_dataframes["users.json"].copy()
brands_df   = cleaned_dataframes["brands.json"].copy()
receipts_df = cleaned_dataframes["receipts.json"].copy()

#converting objects in json to text to comply with inserting it into the table
for df in (users_df, brands_df, receipts_df):
    for col in df.columns:
        if df[col].dtype == "object":
            df[col] = df[col].apply(
                lambda x: json.dumps(x) if isinstance(x, (dict, list)) else x
            )

# DB cursor
cur = conn.cursor()
#deriving roles from the user table and dropping empty values
for role in users_df["role"].dropna().unique():
    cur.execute("INSERT OR IGNORE INTO roles (role_name) VALUES (?)", (role,))
#deriving sign up sources and dropping nulls
for src in users_df["signUpSource"].dropna().unique():
    cur.execute("INSERT OR IGNORE INTO sign_up_sources (source_name) VALUES (?)", (src,))
#getting all state codes and inserting them instead of deriving
US_STATE_CODES = {
    "AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN","IA","KS",
    "KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY",
    "NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT","VA","WA","WV",
    "WI","WY","DC","PR","VI","GU","MP","AS"
}
cur.executemany(
    "INSERT OR IGNORE INTO states (state_code) VALUES (?)",
    [(code,) for code in US_STATE_CODES]
)
# deriving receipt statuses and dropping nulls
for status in receipts_df["rewardsReceiptStatus"].dropna().unique():
    cur.execute("INSERT OR IGNORE INTO receipt_statuses (status) VALUES (?)", (status,))

# deriving distinct categories
cat_pairs = brands_df[["category", "categoryCode"]].drop_duplicates()
cur.executemany(
    "INSERT OR IGNORE INTO categories (category, category_code) VALUES (?, ?)",
    [(r["category"], r["categoryCode"]) for _, r in cat_pairs.iterrows()]
)



brands_df["cpg_extracted"] = brands_df["cpg"].apply(extract_cpg_id)
for cpg_id in brands_df["cpg_extracted"].dropna().unique():
    cur.execute("INSERT OR IGNORE INTO cpgs (id) VALUES (?)", (cpg_id,))

conn.commit()



# Look‑up Series for FK mapping
role_ids   = pd.read_sql("SELECT id, role_name FROM roles", conn).set_index("role_name")["id"]
src_ids    = pd.read_sql("SELECT id, source_name FROM sign_up_sources", conn).set_index("source_name")["id"]
state_ids  = pd.read_sql("SELECT id, state_code FROM states", conn).set_index("state_code")["id"]
status_ids = pd.read_sql("SELECT id, status FROM receipt_statuses", conn).set_index("status")["id"]

cat_df = pd.read_sql("SELECT id, category, category_code FROM categories", conn)
cat_df["key"] = cat_df["category"] + "||" + cat_df["category_code"]
cat_lookup = cat_df.set_index("key")["id"]

# converting users df into tuples that can be dynamically inserted
user_rows = [
    (
        r["_id"],
        safe_int(r["active"], 0),
        safe_ts(r["createdDate"]),
        safe_ts(r["lastLogin"]),
        get_date_id(conn, safe_dt(r["createdDate"])),  
        get_date_id(conn, safe_dt(r["lastLogin"])),    
        fk_id(role_ids,   r["role"]),
        fk_id(src_ids,    r["signUpSource"]),
        fk_id(state_ids,  r["state"])
    )
    for _, r in users_df.iterrows()
]


cur.executemany(
    """
    INSERT OR IGNORE INTO users
          (id, active, createdDate, lastLogin,
           created_date_id, last_login_date_id,
           role_id, signup_source_id, state_id)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """,
    user_rows
)
conn.commit()

#converting brand rows into tuples so they can be inserted
brand_rows = [
    (
        r["_id"],
        r["name"],
        r["barcode"],
        safe_int(r["topBrand"], 0),
        r["brandCode"],
        fk_id(cat_lookup, f"{r['category']}||{r['categoryCode']}"),
        r["cpg_extracted"]
    )
    for _, r in brands_df.iterrows()
]

cur.executemany(
    """
    INSERT OR IGNORE INTO brands
          (id, name, barcode, top_brand, brand_code,
           category_id, cpg_id)
    VALUES (?, ?, ?, ?, ?, ?, ?)
    """,
    brand_rows
)
conn.commit()

# receipts and receipt_items
receipts_rows, receipt_items_rows = [], []
user_ids_set = set(pd.read_sql("SELECT id FROM users", conn)["id"])

for _, r in receipts_df.iterrows():
    # skip receipts whose user wasn’t loaded
    if r["userId"] not in user_ids_set:
        continue

    receipts_rows.append(
        (
            r["_id"],
            r["userId"],
            safe_int(r.get("bonusPointsEarned"), 0),
            r.get("bonusPointsEarnedReason"),
            get_date_id(conn, safe_dt(r["createDate"])),        # ← surrogate id
            get_date_id(conn, safe_dt(r["dateScanned"])),       # ← surrogate id
            get_date_id(conn, safe_dt(r["finishedDate"])),      # ← surrogate id
            get_date_id(conn, safe_dt(r["modifyDate"])),        # ← surrogate id
            get_date_id(conn, safe_dt(r["pointsAwardedDate"])), # ← surrogate id
            get_date_id(conn, safe_dt(r["purchaseDate"])),      # ← surrogate id
            safe_float(r.get("pointsEarned")),
            safe_int(r.get("purchasedItemCount")),
            safe_float(r.get("totalSpent")),
            fk_id(status_ids, r["rewardsReceiptStatus"])
        )
    )

    raw_items = r["rewardsReceiptItemList"]
    try:
        items = raw_items if isinstance(raw_items, list) else json.loads(raw_items or "[]")
    except Exception:
        items = []

    for itm in items:
        receipt_items_rows.append(
            (
                r["_id"],
                itm.get("barcode"),
                itm.get("description"),
                safe_float(itm.get("finalPrice")),
                safe_float(itm.get("itemPrice")),
                safe_int(itm.get("quantityPurchased")),
                safe_int(itm.get("userFlaggedNewItem"), 0),
                itm.get("userFlaggedBarcode"),
                safe_float(itm.get("userFlaggedPrice")),
                safe_int(itm.get("userFlaggedQuantity")),
                safe_int(itm.get("needsFetchReview"), 0),
                itm.get("partnerItemId"),
                itm.get("pointsNotAwardedReason"),
                itm.get("pointsPayerId"),
                itm.get("rewardsGroup"),
                itm.get("rewardsProductPartnerId"),
                itm.get("userFlaggedDescription")
            )
        )

cur.executemany(
    """
    INSERT OR IGNORE INTO receipts
          (id, user_id, bonus_points_earned, bonus_points_reason,
           create_date_id, date_scanned_id, finished_date_id,
           modify_date_id, points_awarded_date_id, purchase_date_id,
           points_earned, purchased_item_count, total_spent,
           receipt_status_id)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """,
    receipts_rows
)
cur.executemany(
    """
    INSERT INTO receipt_items
        (receipt_id, barcode, description, final_price, item_price,
         quantity_purchased, user_flagged_new_item, user_flagged_barcode,
         user_flagged_price, user_flagged_quantity, needs_fetch_review,
         partner_item_id, points_not_awarded_reason, points_payer_id,
         rewards_group, rewards_product_partner_id, user_flagged_description)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """,
    receipt_items_rows
)
conn.commit()



In [78]:
df_receipt_statuses = pd.read_sql_query("""
    SELECT  *
    FROM     receipt_statuses 

""", conn)
print(df_receipt_statuses)
df_quer = pd.read_sql_query("""
  SELECT *
  FROM   users
  -- WHERE  createdDate >= date('now', '-6 months')

""", conn)
print(df_quer)
# looks like the statuses don't match the question prompt.
# er diagram: https://www.dbdiagram.io/d/fetch-6824047e5b2fc4582f7cbc41

   id     status
0   1   FINISHED
1   2   REJECTED
2   3    FLAGGED
3   4  SUBMITTED
4   5    PENDING
                           id  active          createdDate  \
0    5ff1e194b6a9d73a3a9f1052       1  2021-01-03 09:24:04   
1    5ff1e1eacfcf6c399c274ae6       1  2021-01-03 09:25:30   
2    5ff1e1e8cfcf6c399c274ad9       1  2021-01-03 09:25:28   
3    5ff1e1b7cfcf6c399c274a5a       1  2021-01-03 09:24:39   
4    5ff1e1f1cfcf6c399c274b0b       1  2021-01-03 09:25:37   
..                        ...     ...                  ...   
207  5fc961c3b8cfca11a077dd33       1  2020-12-03 16:08:03   
208  5fa41775898c7a11a6bcef3e       1  2020-11-05 09:17:09   
209  5fa32b4d898c7a11a6bcebce       1  2020-11-04 16:29:33   
210  5964eb07e4b03efd0c0f267b       1  2017-07-11 10:13:11   
211  54943462e4b07e684157a532       1  2014-12-19 08:21:22   

               lastLogin  created_date_id  last_login_date_id  role_id  \
0    2021-01-03 09:25:37             5847              5847.0        1   
1    

In [79]:
##

#Problem 1
top5_recent = pd.read_sql_query("""
WITH brand_receipts AS (
  SELECT
      strftime('%Y-%m', dates.calendar_date)              AS year_month,
      brands.id                                           AS brand_id,
      brands.name                                         AS brand_name,
      COUNT(DISTINCT receipts.id)                         AS receipt_count
  FROM receipts
  JOIN dates          ON dates.id        = receipts.purchase_date_id
  JOIN receipt_items  ON receipt_items.receipt_id = receipts.id
  JOIN brands         ON brands.barcode = receipt_items.barcode
  GROUP BY year_month, brands.id
),
latest_month AS (
  SELECT MAX(year_month) AS year_month FROM brand_receipts
)
SELECT brand_name, receipt_count
FROM   brand_receipts
WHERE  year_month = (SELECT year_month FROM latest_month)
ORDER  BY receipt_count DESC
LIMIT  5;
""", conn)
print("Question 1.")
print("Top 5 Brands from the most recent month")
print(top5_recent)
# Problem 2 , this one I wasn't entirely sure the best way to display this. It's awkward but it fulfills the requirement
rank_compare = pd.read_sql_query("""
WITH brand_receipts AS (
    SELECT
        strftime('%Y-%m', dates.calendar_date) AS year_month,
        brands.name                            AS brand_name,
        COUNT(DISTINCT receipts.id)            AS receipt_count
    FROM receipts
    JOIN dates         ON dates.id       = receipts.purchase_date_id
    JOIN receipt_items ON receipt_items.receipt_id = receipts.id
    JOIN brands        ON brands.barcode = receipt_items.barcode
    GROUP BY year_month, brands.name
),
two_latest_months AS (
    SELECT year_month
    FROM   brand_receipts
    GROUP  BY year_month
    ORDER  BY year_month DESC
    LIMIT  2
),
ranked AS (
    SELECT
        year_month,
        brand_name,
        DENSE_RANK() OVER (PARTITION BY year_month
                           ORDER BY receipt_count DESC) AS rank_position
    FROM brand_receipts
    WHERE year_month IN (SELECT year_month FROM two_latest_months)
)

-- ▸ wrap the UNION in an outer SELECT so we can order by an expression
SELECT *
FROM (
    -- recent month (top‑5 dense ranks)
    SELECT
        rank_position     AS Rank,
        'Recent_Month'    AS MonthType,
        brand_name        AS Brand
    FROM ranked
    WHERE year_month   = (SELECT MAX(year_month) FROM two_latest_months)
      AND rank_position <= 5

    UNION ALL

    -- previous month (top‑5 dense ranks)
    SELECT
        rank_position,
        'Previous_Month',
        brand_name
    FROM ranked
    WHERE year_month   = (SELECT MIN(year_month) FROM two_latest_months)
      AND rank_position <= 5
) AS combined
ORDER BY
    CASE MonthType WHEN 'Recent_Month' THEN 0 ELSE 1 END,  -- recent first
    Rank,
    Brand;


""", conn)
print("Question 2.")
print("Top 5 Brands most recent month vs. previous month")
print(rank_compare)
#Problem 3.
avg_spend = pd.read_sql_query("""
SELECT
    receipt_statuses.status,
    ROUND(AVG(receipts.total_spent), 2) AS average_spend,
    COUNT(*)                            AS receipt_count
FROM receipts
JOIN receipt_statuses ON receipt_statuses.id = receipts.receipt_status_id
WHERE receipt_statuses.status IN ('FINISHED', 'REJECTED')
GROUP BY receipt_statuses.status;
""", conn)
print("Question 3.")
print("Average spend by receipt status:")
print(avg_spend, "\n")
higher_avg_status = avg_spend.loc[avg_spend.average_spend.idxmax(), "status"]
print(f"Higher average spend: {higher_avg_status}\n")

# Question 4. 
total_items = pd.read_sql_query("""
SELECT
    receipt_statuses.status,
    SUM(receipts.purchased_item_count) AS total_items
FROM receipts
JOIN receipt_statuses ON receipt_statuses.id = receipts.receipt_status_id
WHERE receipt_statuses.status IN ('FINISHED', 'REJECTED')
GROUP BY receipt_statuses.status;
""", conn)
print("Question 4.")
print("Total items purchased by receipt status:")
print(total_items, "\n")
higher_items_status = total_items.loc[total_items.total_items.idxmax(), "status"]
print(f"Higher total items: {higher_items_status}\n")

# Question 5. The question asked users in the last 6 months but the most recent user is 2021 so going off the 6 months prior to the max
brand_most_spend = pd.read_sql_query("""
WITH new_users AS (
  SELECT users.id
  FROM   users
  JOIN   dates on dates.id = users.created_date_id
  WHERE  dates.calendar_date >= date((SELECT MAX(DATE(createdDate)) from users), '-6 months')
),
receipts_new_users AS (
  SELECT id, total_spent
  FROM   receipts
  WHERE  user_id IN (SELECT id FROM new_users)
),
brand_spend AS (
  SELECT
      brands.name                     AS brand_name,
      SUM(receipts_new_users.total_spent) AS total_spend
  FROM receipts_new_users
  JOIN receipt_items ON receipt_items.receipt_id = receipts_new_users.id
  JOIN brands        ON brands.barcode         = receipt_items.barcode
  GROUP BY brand_name
)
SELECT brand_name, total_spend
FROM   brand_spend
ORDER  BY total_spend DESC
LIMIT  1;
""", conn)
print("Question 5.")
print("Brand with the most spend (new users, last 6 months):")
print(brand_most_spend, "\n")

#Question 6.Again going off the max user date now real life 6 months
brand_most_tx = pd.read_sql_query("""
WITH new_users AS (
  SELECT id
  FROM   users
  WHERE  createdDate >= date((SELECT MAX(DATE(createdDate)) from users), '-6 months')
),
receipts_new_users AS (
  SELECT id
  FROM   receipts
  WHERE  user_id IN (SELECT id FROM new_users)
),
brand_transactions AS (
  SELECT
      brands.name                    AS brand_name,
      COUNT(DISTINCT receipt_items.receipt_id) AS transaction_count
  FROM receipts_new_users
  JOIN receipt_items ON receipt_items.receipt_id = receipts_new_users.id
  JOIN brands        ON brands.barcode          = receipt_items.barcode
  GROUP BY brand_name
)
SELECT brand_name, transaction_count
FROM   brand_transactions
ORDER  BY transaction_count DESC
LIMIT  1;
""", conn)
print("Question 6.")
print("Brand with the most transactions (new users, last 6 months):")
print(brand_most_tx)



Question 1.
Top 5 Brands from the most recent month
              brand_name  receipt_count
0               Tostitos             10
1                Swanson             10
2           Kettle Brand              3
3  Cracker Barrel Cheese              2
4                 Jell-O              2
Question 2.
Top 5 Brands most recent month vs. previous month
    Rank       MonthType                  Brand
0      1    Recent_Month                Swanson
1      1    Recent_Month               Tostitos
2      2    Recent_Month           Kettle Brand
3      3    Recent_Month  Cracker Barrel Cheese
4      3    Recent_Month                 Jell-O
5      4    Recent_Month                Cheetos
6      4    Recent_Month        Diet Chris Cola
7      4    Recent_Month        Pepperidge Farm
8      4    Recent_Month                  Prego
9      4    Recent_Month                 Quaker
10     4    Recent_Month                     V8
11     1  Previous_Month            Grey Poupon
12     1  Previous_Mon

In [80]:
#Dataset EDA Third: Evaluate Data Quality Issues in the Data Provided. Ill start with extracted but non-normalized data 
#I first would look at structural issues with the files
import tarfile
import pandas
gz_folder = 'source_files'
unzipped_data = {}

# Loop through all .gz files in the folder and unzip them into a dictionary that holds each respective file 
for filename in os.listdir(gz_folder):
    #Constructing the path to open the file
    gz_path = os.path.join(gz_folder, filename)
    #Opening the file with rt to convert automatically to text 
    try:
        with tarfile.open(gz_path, 'r:gz') as tar:
            for member in tar.getmembers():
                if member.isfile():
                    extracted = tar.extractfile(member)
                    if extracted:
                        content = extracted.read().decode('utf-8')
                        lines = content.strip().split('\n')
                        pretty_lines = []
                        for line in lines:
                            if not line.strip():
                                continue
                            try:
                                json_obj = json.loads(line)
                                pretty_lines.append(json.dumps(json_obj, indent=4))
                            except json.JSONDecodeError as e:
                                print(f"Skipping line in {member.name}: {e}")
                        unzipped_data[member.name] = '\n\n'.join(pretty_lines)
    except tarfile.ReadError:
        with gzip.open(gz_path, 'rt') as file:
            base_name = filename[:-3]
# In case TAR unzip doesn't work
            unzipped_data[base_name] = file.read()

print("Files extracted:", list(unzipped_data.keys()))
# print(unzipped_data['brands.json'])
# print(unzipped_data['users.json'])
# print(unzipped_data['receipts.json'])



Files extracted: ['brands.json', 'receipts.json', 'users.json']


In [90]:
#Next checking if all files are valid NDJSON. users.json seems to have some issue with non-printing characters
import json

for file_name, content in unzipped_data.items():
    print(f"Validating {file_name}...")
    lines = content.strip().split('\n')
    for idx, line in enumerate(lines, start=1):
        line = line.strip()
        if not line:
            continue
        try:
            json.loads(line)
        except json.JSONDecodeError as e:
            print(f"Invalid JSON in {file_name} at line {idx}: {e}")
            print(f"     Line content: {line[:200]}{'...' if len(line) > 200 else ''}")
            break  # Optionally stop at the first error per file
    else:
        print(f"{file_name} is valid NDJSON.\n")


Validating brands.json...
brands.json is valid NDJSON.

Validating receipts.json...
receipts.json is valid NDJSON.

Validating users.json...
Invalid JSON in users.json at line 1: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
     Line content: {


In [84]:
#Cleaning up the pretty print issue
cleaned_dataframes = {}

# Explicitly define which files use NDJSON structure
ndjson_files = {'receipts.json', 'brands.json'}
pretty_json_files = {'users.json'}

for filename, content in unzipped_data.items():
    try:
        stripped = content.strip()

        if filename in ndjson_files:
            # NDJSON: each line is a standalone JSON object
            json_objects = [
                json.loads(line)
                for line in stripped.splitlines()
                if line.strip()
            ]
        elif filename in pretty_json_files:
            # Pretty-printed or compact JSON with blank-line separation
            blocks = stripped.split('\n\n')
            json_objects = [
                json.loads(block)
                for block in blocks
                if block.strip()
            ]
        else:
            raise ValueError(f"Unknown format for file: {filename}")

        df = pd.DataFrame(json_objects)
        cleaned_dataframes[filename] = df

    except Exception as e:
        print(f"Failed to load {filename} into DataFrame: {e}")


In [85]:
#Checking for nested objects
print("Users json: ",cleaned_dataframes['users.json'].head())
print("Brands json: ",cleaned_dataframes['brands.json'].head())
print(cleaned_dataframes['brands.json']['cpg'].head())

Users json:                                      _id  active               createdDate  \
0  {'$oid': '5ff1e194b6a9d73a3a9f1052'}    True  {'$date': 1609687444800}   
1  {'$oid': '5ff1e194b6a9d73a3a9f1052'}    True  {'$date': 1609687444800}   
2  {'$oid': '5ff1e194b6a9d73a3a9f1052'}    True  {'$date': 1609687444800}   
3  {'$oid': '5ff1e1eacfcf6c399c274ae6'}    True  {'$date': 1609687530554}   
4  {'$oid': '5ff1e194b6a9d73a3a9f1052'}    True  {'$date': 1609687444800}   

                  lastLogin      role signUpSource state  
0  {'$date': 1609687537858}  consumer        Email    WI  
1  {'$date': 1609687537858}  consumer        Email    WI  
2  {'$date': 1609687537858}  consumer        Email    WI  
3  {'$date': 1609687530597}  consumer        Email    WI  
4  {'$date': 1609687537858}  consumer        Email    WI  
Brands json:                                      _id       barcode        category  \
0  {'$oid': '601ac115be37ce2ead437551'}  511111019862          Baking   
1  {'$oid'

In [86]:
#Cleaning up nested objects. Looking at the data I can see cpg has ref and id columns I would flatten those (but won't for the data validation excersize.
# In general I wouldn't want to have nested objects if it's not required (i.e nested objects with single members)
from datetime import datetime
#Cleaning up nested objects

for df in cleaned_dataframes.values():
    # 1. Normalize _id.$oid to a string
    if '_id' in df.columns:
        df['_id'] = df['_id'].apply(
            lambda x: x.get('$oid') if isinstance(x, dict) and '$oid' in x else x
        )

    # 2. Normalize all columns that include "date" (case-insensitive)
    for col in df.columns:
        if 'date' or 'lastlogin' in col.lower():
            df[col] = df[col].apply(
                lambda x: datetime.fromtimestamp(x['$date'] / 1000.0)
                if isinstance(x, dict) and '$date' in x else x
            )
print("Users json: ",cleaned_dataframes['users.json'].head())
print("Brands json: ",cleaned_dataframes['brands.json'].head())
print("Receipts json: ",cleaned_dataframes['receipts.json'].head())


Users json:                          _id  active             createdDate  \
0  5ff1e194b6a9d73a3a9f1052    True 2021-01-03 09:24:04.800   
1  5ff1e194b6a9d73a3a9f1052    True 2021-01-03 09:24:04.800   
2  5ff1e194b6a9d73a3a9f1052    True 2021-01-03 09:24:04.800   
3  5ff1e1eacfcf6c399c274ae6    True 2021-01-03 09:25:30.554   
4  5ff1e194b6a9d73a3a9f1052    True 2021-01-03 09:24:04.800   

                lastLogin      role signUpSource state  
0 2021-01-03 09:25:37.858  consumer        Email    WI  
1 2021-01-03 09:25:37.858  consumer        Email    WI  
2 2021-01-03 09:25:37.858  consumer        Email    WI  
3 2021-01-03 09:25:30.597  consumer        Email    WI  
4 2021-01-03 09:25:37.858  consumer        Email    WI  
Brands json:                          _id       barcode        category      categoryCode  \
0  601ac115be37ce2ead437551  511111019862          Baking            BAKING   
1  601c5460be37ce2ead43755f  511111519928       Beverages         BEVERAGES   
2  601ac142be37

In [89]:
# I'm not sure it's necessary to fully write out all tests after this. In general here's a non-exhaustive list of things I would check
# after JSON structural tests.
"""
1. Enforce foreign keys between users, receipts, brands and receipt items
2. Check for missing dates (such as a missing pointsAwardedDate but filled in points earned
3. Check for amounts, quantities that deviate significantly from the mean
4. Check for non valid states.
5. Check types (for example a non numeric barcode etc.)
6. Check duplicate receipts (i.e a user purchasing the exact same thing twice on the same date)
7. Look for things like  test brand @1612366146827 and ensure they are valid for the use case
8. Check that a purchase date is older than the user that bought it
"""

'\n1. Enforce foreign keys between users, receipts, brands and receipt items\n2. Check for missing dates (such as a missing pointsAwardedDate but filled in points earned\n3. Check for amounts, quantities that deviate significantly from the mean\n4. Check for non valid states.\n5. Check types (for example a non numeric barcode etc.)\n6. Check duplicate receipts (i.e a user purchasing the exact same thing twice on the same date)\n7. Look for things like  test brand @1612366146827 and ensure they are valid for the use case\n8. Check that a purchase date is older than the user that bought it\n'

In [96]:
"""Subject: Data Alignment and Open Questions

Hi [Stakeholder's Name],

I hope you're doing well.

As we begin working with the dataset, I want to make sure we're aligned on expectations and priorities. Below are several key questions and observations that will help guide next steps and ensure the data is optimized for your use cases.

Questions about the data:

What are “partner items” and how should they be interpreted in analysis?

The current “brands” feed appears to list individual items rather than brand groupings. Should there be a separate feed for brands?

We’ve identified what appears to be test data (e.g., users with the role "fetch-staff"). Is this data intended for production use? Are there any valid purchases from non-consumer roles?

What are the expected use cases for this data (e.g., reporting, data science, real-time applications)?

What is the anticipated data volume on a daily, monthly, and annual basis?

Will the data be incrementally loaded or fully refreshed?

How frequently will the reports or dashboards built on this data be accessed?

How many users will be interacting with the final outputs?

What are the expectations around data freshness?

Are there any known pain points we should be aware of (e.g., slow report refreshes, volume-related issues)?

Initial data quality issues:

We encountered issues with the user dataset due to formatting inconsistencies. Some fields were missing or empty, which interfered with loading. We'll need to assess the scale of these issues and determine which fields are critical versus optional for the target workloads.

To resolve the quality issues, we’ll need:

Information about the source system

Access to the original data source, if possible

Clarification on which fields are essential for the end use cases

Additional considerations for performance and scalability:

Depending on data volume and query performance, we may need to consider strategies like denormalization or alternate modeling approaches

The ETL tools and database platform in use will influence how we structure and optimize the data

For larger or more complex workloads, we may need to implement partitioning or evaluate distributed computing strategies

It’s also important to understand team capabilities so we can design solutions that align with existing skill sets

Please let me know your thoughts on the above, or if you'd prefer to set up a quick meeting to walk through the details.

Best regards,
Harrison Reid
Senior Data Engineer
"""


_IncompleteInputError: incomplete input (4237455029.py, line 35)