In [1]:
import pandas as pd
from datetime import datetime, timezone
import json
import psycopg2
from psycopg2.extras import execute_values
import logging

### Compile and crearte Dataframes for `receipts.json`

In [3]:
json_file_path = "receipts.json"

try:
    # Read JSONL file directly into a DataFrame
    df_receipts_raw = pd.read_json(json_file_path, lines=True)

    # Clean and prepare receipt data
    receipts_data = []
    for idx, record in df_receipts_raw.iterrows():
        # Handle cases where '_id' might be NaN or not a dict
        id_dict = record.get('_id')
        if not isinstance(id_dict, dict) or '$oid' not in id_dict:
            continue

        # Extract timestamps safely, checking for NaN
        try:
            create_date = record.get('createDate')
            date_scanned = record.get('dateScanned')
            finished_date = record.get('finishedDate')
            modify_date = record.get('modifyDate')
            points_awarded_date = record.get('pointsAwardedDate')
            purchase_date = record.get('purchaseDate')

            receipt = {
                'receipt_id': id_dict['$oid'],
                'bonus_points_earned': record.get('bonusPointsEarned'),
                'bonus_points_earned_reason': record.get('bonusPointsEarnedReason'),
                'create_date': datetime.fromtimestamp(create_date['$date'] / 1000, tz=timezone.utc) if isinstance(create_date, dict) and '$date' in create_date else None,
                'date_scanned': datetime.fromtimestamp(date_scanned['$date'] / 1000, tz=timezone.utc) if isinstance(date_scanned, dict) and '$date' in date_scanned else None,
                'finished_date': datetime.fromtimestamp(finished_date['$date'] / 1000, tz=timezone.utc) if isinstance(finished_date, dict) and '$date' in finished_date else None,
                'modify_date': datetime.fromtimestamp(modify_date['$date'] / 1000, tz=timezone.utc) if isinstance(modify_date, dict) and '$date' in modify_date else None,
                'points_awarded_date': datetime.fromtimestamp(points_awarded_date['$date'] / 1000, tz=timezone.utc) if isinstance(points_awarded_date, dict) and '$date' in points_awarded_date else None,
                'points_earned': float(record.get('pointsEarned', '0')),
                'purchase_date': datetime.fromtimestamp(purchase_date['$date'] / 1000, tz=timezone.utc) if isinstance(purchase_date, dict) and '$date' in purchase_date else None,
                'purchased_item_count': record.get('purchasedItemCount', 0),
                'rewards_receipt_status': record.get('rewardsReceiptStatus'),
                'total_spent': float(record.get('totalSpent', '0')),
                'user_id': record.get('userId')
            }
            receipts_data.append(receipt)
        except (TypeError, ValueError, KeyError):
            continue

    # Create Receipts DataFrame
    df_receipts = pd.DataFrame(receipts_data)

    # Prepare items data from rewardsReceiptItemList
    items_data = []
    for idx, record in df_receipts_raw.iterrows():
        id_dict = record.get('_id')
        if not isinstance(id_dict, dict) or '$oid' not in id_dict:
            continue

        items_list = record.get('rewardsReceiptItemList', [])
        if not isinstance(items_list, list):
            continue

        for item in items_list:
            try:
                item_record = {
                    'receipt_id': id_dict['$oid'],
                    'barcode': item.get('barcode'),
                    'description': item.get('description'),
                    'final_price': float(item.get('finalPrice', '0')),
                    'item_price': float(item.get('itemPrice', '0')),
                    'needs_fetch_review': item.get('needsFetchReview', False),
                    'partner_item_id': item.get('partnerItemId'),
                    'prevent_target_gap_points': item.get('preventTargetGapPoints', False),
                    'quantity_purchased': item.get('quantityPurchased', 0),
                    'user_flagged_barcode': item.get('userFlaggedBarcode'),
                    'user_flagged_new_item': item.get('userFlaggedNewItem', False),
                    'user_flagged_price': float(item.get('userFlaggedPrice', '0')),
                    'user_flagged_quantity': item.get('userFlaggedQuantity', 0)
                }
                items_data.append(item_record)
            except (TypeError, ValueError):
                continue

    # Create Items DataFrame
    df_items = pd.DataFrame(items_data)

except FileNotFoundError:
    pass
except ValueError:
    pass
except Exception:
    pass

In [4]:
df_receipts.head(5)

Unnamed: 0,receipt_id,bonus_points_earned,bonus_points_earned_reason,create_date,date_scanned,finished_date,modify_date,points_awarded_date,points_earned,purchase_date,purchased_item_count,rewards_receipt_status,total_spent,user_id
0,5ff1e1eb0a720f0523000575,500.0,"Receipt number 2 completed, bonus point schedu...",2021-01-03 15:25:31+00:00,2021-01-03 15:25:31+00:00,2021-01-03 15:25:31+00:00,2021-01-03 15:25:36+00:00,2021-01-03 15:25:31+00:00,500.0,2021-01-03 00:00:00+00:00,5.0,FINISHED,26.0,5ff1e1eacfcf6c399c274ae6
1,5ff1e1bb0a720f052300056b,150.0,"Receipt number 5 completed, bonus point schedu...",2021-01-03 15:24:43+00:00,2021-01-03 15:24:43+00:00,2021-01-03 15:24:43+00:00,2021-01-03 15:24:48+00:00,2021-01-03 15:24:43+00:00,150.0,2021-01-02 15:24:43+00:00,2.0,FINISHED,11.0,5ff1e194b6a9d73a3a9f1052
2,5ff1e1f10a720f052300057a,5.0,All-receipts receipt bonus,2021-01-03 15:25:37+00:00,2021-01-03 15:25:37+00:00,NaT,2021-01-03 15:25:42+00:00,NaT,5.0,2021-01-03 00:00:00+00:00,1.0,REJECTED,10.0,5ff1e1f1cfcf6c399c274b0b
3,5ff1e1ee0a7214ada100056f,5.0,All-receipts receipt bonus,2021-01-03 15:25:34+00:00,2021-01-03 15:25:34+00:00,2021-01-03 15:25:34+00:00,2021-01-03 15:25:39+00:00,2021-01-03 15:25:34+00:00,5.0,2021-01-03 00:00:00+00:00,4.0,FINISHED,28.0,5ff1e1eacfcf6c399c274ae6
4,5ff1e1d20a7214ada1000561,5.0,All-receipts receipt bonus,2021-01-03 15:25:06+00:00,2021-01-03 15:25:06+00:00,2021-01-03 15:25:11+00:00,2021-01-03 15:25:11+00:00,2021-01-03 15:25:06+00:00,5.0,2021-01-02 15:25:06+00:00,2.0,FINISHED,1.0,5ff1e194b6a9d73a3a9f1052


In [5]:
df_items.head(5)

Unnamed: 0,receipt_id,barcode,description,final_price,item_price,needs_fetch_review,partner_item_id,prevent_target_gap_points,quantity_purchased,user_flagged_barcode,user_flagged_new_item,user_flagged_price,user_flagged_quantity
0,5ff1e1eb0a720f0523000575,4011.0,ITEM NOT FOUND,26.0,26.0,False,1,True,5,4011.0,True,26.0,5
1,5ff1e1bb0a720f052300056b,4011.0,ITEM NOT FOUND,1.0,1.0,False,1,False,1,,False,0.0,0
2,5ff1e1bb0a720f052300056b,28400642255.0,DORITOS TORTILLA CHIP SPICY SWEET CHILI REDUCE...,10.0,10.0,True,2,True,1,28400642255.0,True,10.0,1
3,5ff1e1f10a720f052300057a,,,0.0,0.0,False,1,True,0,4011.0,True,26.0,3
4,5ff1e1ee0a7214ada100056f,4011.0,ITEM NOT FOUND,28.0,28.0,False,1,True,4,4011.0,True,28.0,4


### Compile and crearte Dataframe for `users.json`

In [7]:
users_file_path = "users.json"

try:
    # Read and parse JSON file line-by-line
    data = []
    with open(users_file_path, 'r') as file:
        for line in file:
            line = line.strip()
            if line:
                try:
                    data.append(json.loads(line))
                except json.JSONDecodeError:
                    continue

    if not data:
        raise ValueError("No valid JSON data found in users file")

    # Clean and prepare user data
    users_data = []
    for record in data:
        # Handle '_id' (user Id)
        id_dict = record.get('_id')
        if not isinstance(id_dict, dict) or '$oid' not in id_dict:
            continue

        try:
            user = {
                'user_id': id_dict['$oid'],
                'state': record.get('state'),
                'created_date': datetime.fromtimestamp(record.get('createdDate', {}).get('$date', 0) / 1000, tz=timezone.utc) if isinstance(record.get('createdDate'), dict) and '$date' in record.get('createdDate') else None,
                'last_login': datetime.fromtimestamp(record.get('lastLogin', {}).get('$date', 0) / 1000, tz=timezone.utc) if isinstance(record.get('lastLogin'), dict) and '$date' in record.get('lastLogin') else None,
                'role': record.get('role', 'CONSUMER'),
                'active': record.get('active', True),
                'sign_up_source': record.get('signUpSource')
            }
            users_data.append(user)
        except (TypeError, ValueError, KeyError):
            continue

    # Create Users DataFrame
    df_users = pd.DataFrame(users_data)

except FileNotFoundError:
    pass
except ValueError:
    pass
except Exception:
    pass

In [8]:
df_users.head(5)

Unnamed: 0,user_id,state,created_date,last_login,role,active,sign_up_source
0,5ff1e194b6a9d73a3a9f1052,WI,2021-01-03 15:24:04.800000+00:00,2021-01-03 15:25:37.858000+00:00,consumer,True,Email
1,5ff1e194b6a9d73a3a9f1052,WI,2021-01-03 15:24:04.800000+00:00,2021-01-03 15:25:37.858000+00:00,consumer,True,Email
2,5ff1e194b6a9d73a3a9f1052,WI,2021-01-03 15:24:04.800000+00:00,2021-01-03 15:25:37.858000+00:00,consumer,True,Email
3,5ff1e1eacfcf6c399c274ae6,WI,2021-01-03 15:25:30.554000+00:00,2021-01-03 15:25:30.597000+00:00,consumer,True,Email
4,5ff1e194b6a9d73a3a9f1052,WI,2021-01-03 15:24:04.800000+00:00,2021-01-03 15:25:37.858000+00:00,consumer,True,Email


### Compile and crearte Dataframe for `brands.json`

In [10]:
brands_file_path = "brands.json"

try:
    # Read and parse JSON file line-by-line
    data = []
    with open(brands_file_path, 'r') as file:
        for line in file:
            line = line.strip()
            if line:
                try:
                    data.append(json.loads(line))
                except json.JSONDecodeError:
                    continue

    if not data:
        raise ValueError("No valid JSON data found in brands file")

    # Clean and prepare brand data
    brands_data = []
    for record in data:
        # Handle '_id' (brand UUID)
        id_dict = record.get('_id')
        if not isinstance(id_dict, dict) or '$oid' not in id_dict:
            continue

        # Handle 'cpg' (reference to CPG collection)
        cpg_dict = record.get('cpg', {})
        cpg_id_dict = cpg_dict.get('$id', {})
        cpg_id = cpg_id_dict.get('$oid') if isinstance(cpg_id_dict, dict) else None
        cpg_ref = cpg_dict.get('$ref')

        try:
            brand = {
                'brand_id': id_dict['$oid'],
                'barcode': record.get('barcode'),
                'brand_code': record.get('brandCode'),
                'category': record.get('category'),
                'category_code': record.get('categoryCode'),
                'cpg_id': cpg_id,
                'cpg_ref': cpg_ref,
                'top_brand': record.get('topBrand', False),
                'name': record.get('name')
            }
            brands_data.append(brand)
        except (TypeError, ValueError, KeyError):
            continue

    # Create Brands DataFrame
    df_brands = pd.DataFrame(brands_data)

except FileNotFoundError:
    pass
except ValueError:
    pass
except Exception:
    pass

In [11]:
df_brands.head(5)

Unnamed: 0,brand_id,barcode,brand_code,category,category_code,cpg_id,cpg_ref,top_brand,name
0,601ac115be37ce2ead437551,511111019862,,Baking,BAKING,601ac114be37ce2ead437550,Cogs,False,test brand @1612366101024
1,601c5460be37ce2ead43755f,511111519928,STARBUCKS,Beverages,BEVERAGES,5332f5fbe4b03c9a25efd0ba,Cogs,False,Starbucks
2,601ac142be37ce2ead43755d,511111819905,TEST BRANDCODE @1612366146176,Baking,BAKING,601ac142be37ce2ead437559,Cogs,False,test brand @1612366146176
3,601ac142be37ce2ead43755a,511111519874,TEST BRANDCODE @1612366146051,Baking,BAKING,601ac142be37ce2ead437559,Cogs,False,test brand @1612366146051
4,601ac142be37ce2ead43755e,511111319917,TEST BRANDCODE @1612366146827,Candy & Sweets,CANDY_AND_SWEETS,5332fa12e4b03c9a25efd1e7,Cogs,False,test brand @1612366146827


### Create tables and insert data from Dataframes to the tables

In [13]:
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load secrets from secret.json
with open("secret.json", "r") as file:
    secret = json.load(file)
    AWS_REGION = secret.get("AWS_REGION")
    REDSHIFT_CLUSTER = secret.get("REDSHIFT_CLUSTER")  # RDS endpoint
    DATABASE_NAME = secret.get("DATABASE_NAME")
    DB_USER = secret.get("DB_USER")
    DB_PASSWORD = secret.get("DB_PASSWORD")

# Database connection parameters
DB_PORT = 5432  # Default PostgreSQL port

# SQL statements to create tables
create_tables_sql = {
    "receipts": """
        CREATE TABLE IF NOT EXISTS receipts (
            receipt_id VARCHAR(24) PRIMARY KEY,
            bonus_points_earned INTEGER,
            bonus_points_earned_reason TEXT,
            create_date TIMESTAMP WITH TIME ZONE,
            date_scanned TIMESTAMP WITH TIME ZONE,
            finished_date TIMESTAMP WITH TIME ZONE,
            modify_date TIMESTAMP WITH TIME ZONE,
            points_awarded_date TIMESTAMP WITH TIME ZONE,
            points_earned FLOAT,
            purchase_date TIMESTAMP WITH TIME ZONE,
            purchased_item_count INTEGER,
            rewards_receipt_status VARCHAR(50),
            total_spent FLOAT,
            user_id VARCHAR(24)
        );
    """,
    "items": """
        CREATE TABLE IF NOT EXISTS items (
            receipt_id VARCHAR(24),
            barcode VARCHAR(50),
            description TEXT,
            final_price FLOAT,
            item_price FLOAT,
            needs_fetch_review BOOLEAN,
            partner_item_id VARCHAR(50),
            prevent_target_gap_points BOOLEAN,
            quantity_purchased INTEGER,
            user_flagged_barcode VARCHAR(50),
            user_flagged_new_item BOOLEAN,
            user_flagged_price FLOAT,
            user_flagged_quantity INTEGER
        );
    """,
    "users": """
        CREATE TABLE IF NOT EXISTS users (
            user_id VARCHAR(24),
            state VARCHAR(2),
            created_date TIMESTAMP WITH TIME ZONE,
            last_login TIMESTAMP WITH TIME ZONE,
            role VARCHAR(20),
            active BOOLEAN,
            sign_up_source VARCHAR(50)
        );
    """,
    "brands": """
        CREATE TABLE IF NOT EXISTS brands (
            brand_id VARCHAR(24) PRIMARY KEY,
            barcode VARCHAR(50),
            brand_code VARCHAR(50),
            category VARCHAR(50),
            category_code VARCHAR(50),
            cpg_id VARCHAR(24),
            cpg_ref VARCHAR(20),
            top_brand BOOLEAN,
            name TEXT
        );
    """
}

# SQL insert statements (column order matches DataFrame columns)
insert_queries = {
    "receipts": """
        INSERT INTO receipts (receipt_id, bonus_points_earned, bonus_points_earned_reason, create_date, date_scanned, 
                              finished_date, modify_date, points_awarded_date, points_earned, purchase_date, 
                              purchased_item_count, rewards_receipt_status, total_spent, user_id)
        VALUES %s;

    """,
    "items": """
        INSERT INTO items (receipt_id, barcode, description, final_price, item_price, needs_fetch_review, 
                           partner_item_id, prevent_target_gap_points, quantity_purchased, user_flagged_barcode, 
                           user_flagged_new_item, user_flagged_price, user_flagged_quantity)
        VALUES %s;
    """,
    "users": """
        INSERT INTO users (user_id, state, created_date, last_login, role, active, sign_up_source)
        VALUES %s;
    """,
    "brands": """
        INSERT INTO brands (brand_id, barcode, brand_code, category, category_code, cpg_id, cpg_ref, top_brand, name)
        VALUES %s;
    """
}


try:
    # Connect to the PostgreSQL database on RDS
    conn = psycopg2.connect(
        host=REDSHIFT_CLUSTER,
        port=DB_PORT,
        dbname=DATABASE_NAME,
        user=DB_USER,
        password=DB_PASSWORD
    )
    conn.autocommit = False  # Disable autocommit for transaction control
    cursor = conn.cursor()

    # Create tables
    for table_name, sql in create_tables_sql.items():
        logger.info(f"Creating table {table_name}...")
        cursor.execute(sql)
        logger.info(f"Table {table_name} created successfully")

    # Insert data from DataFrames
    dataframes = {
        "receipts": df_receipts,
        "items": df_items,
        "users": df_users,
        "brands": df_brands
    }

    for table_name, df in dataframes.items():
        if df is None or df.empty:
            logger.warning(f"No data to insert into {table_name}")
            continue

        logger.info(f"Inserting data into {table_name}...")
        # Convert DataFrame to list of tuples, replacing NaN with None for SQL
        data = [tuple(None if pd.isna(x) else x for x in row) for row in df.itertuples(index=False)]
        execute_values(cursor, insert_queries[table_name], data)
        logger.info(f"Inserted {len(data)} rows into {table_name}")

    # Commit the transaction
    conn.commit()
    logger.info("All changes committed successfully")

except psycopg2.Error as e:
    logger.error(f"Database error: {e}")
    if 'conn' in locals():
        conn.rollback()
        logger.info("Transaction rolled back due to error")
except Exception as e:
    logger.error(f"Unexpected error: {e}")
    if 'conn' in locals():
        conn.rollback()
        logger.info("Transaction rolled back due to error")
finally:
    # Clean up
    if 'cursor' in locals():
        cursor.close()
    if 'conn' in locals():
        conn.close()
    logger.info("Database connection closed")

2025-03-10 21:53:08,595 - INFO - Creating table receipts...
2025-03-10 21:53:08,722 - INFO - Table receipts created successfully
2025-03-10 21:53:08,723 - INFO - Creating table items...
2025-03-10 21:53:08,780 - INFO - Table items created successfully
2025-03-10 21:53:08,782 - INFO - Creating table users...
2025-03-10 21:53:08,847 - INFO - Table users created successfully
2025-03-10 21:53:08,848 - INFO - Creating table brands...
2025-03-10 21:53:08,904 - INFO - Table brands created successfully
2025-03-10 21:53:08,905 - INFO - Inserting data into receipts...
2025-03-10 21:53:09,822 - INFO - Inserted 1119 rows into receipts
2025-03-10 21:53:09,824 - INFO - Inserting data into items...
2025-03-10 21:53:14,290 - INFO - Inserted 6941 rows into items
2025-03-10 21:53:14,296 - INFO - Inserting data into users...
2025-03-10 21:53:14,644 - INFO - Inserted 495 rows into users
2025-03-10 21:53:14,645 - INFO - Inserting data into brands...
2025-03-10 21:53:15,376 - INFO - Inserted 1167 rows into 