In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, udf, col, explode, from_unixtime
import datetime
from pyspark.sql.types import *


In [2]:
spark = SparkSession.builder.appName('Fetch_Rewards_Analytics').getOrCreate()

In [23]:
# Define the schema for the rewardsReceiptItemList array
item_schema = StructType([
    StructField("rewardsGroup", StringType(), True),
    StructField("barcode", StringType(), True),
    StructField("competitiveProduct", StringType(), True),
    StructField("description", StringType(), True),
    StructField("finalPrice", StringType(), True),
    StructField("itemPrice", StringType(), True),
    StructField("needsFetchReview", BooleanType(), True),
    StructField("partnerItemId", StringType(), True),
    StructField("preventTargetGapPoints", BooleanType(), True),
    StructField("quantityPurchased", IntegerType(), True),
    StructField("userFlaggedBarcode", StringType(), True),
    StructField("userFlaggedNewItem", BooleanType(), True),
    StructField("userFlaggedPrice", StringType(), True),
    StructField("userFlaggedQuantity", IntegerType(), True)
])

# Define the main schema
schema = StructType([
    StructField("_id", StructType([
        StructField("$oid", StringType(), True)
    ]), True),
    StructField("bonusPointsEarned", IntegerType(), True),
    StructField("bonusPointsEarnedReason", StringType(), True),
    StructField("createDate", StructType([
        StructField("$date", LongType(), True)
    ]), True),
    StructField("dateScanned", StructType([
        StructField("$date", LongType(), True)
    ]), True),
    StructField("finishedDate", StructType([
        StructField("$date", LongType(), True)
    ]), True),
    StructField("modifyDate", StructType([
        StructField("$date", LongType(), True)
    ]), True),
    StructField("pointsAwardedDate", StructType([
        StructField("$date", LongType(), True)
    ]), True),
    StructField("pointsEarned", StringType(), True),
    StructField("purchaseDate", StructType([
        StructField("$date", LongType(), True)
    ]), True),
    StructField("purchasedItemCount", IntegerType(), True),
    StructField("rewardsReceiptItemList", ArrayType(item_schema), True),
    StructField("rewardsReceiptStatus", StringType(), True),
    StructField("totalSpent", StringType(), True),
    StructField("userId", StringType(), True)
])

# Read the JSON file
df = spark.read.schema(schema).json("/content/receipts.json")

# Convert timestamp fields from milliseconds to readable dates
date_columns = [
    "createDate", "dateScanned", "finishedDate",
    "modifyDate", "pointsAwardedDate", "purchaseDate"
]

for date_col in date_columns:
    df = df.withColumn(
        f"{date_col}_formatted",
        from_unixtime(col(f"{date_col}.$date")/1000)
    ) \
    .drop(date_col)

# If you want to flatten the _id field
df = df.withColumn("receipt_id", col("_id.$oid")).drop('_id')

# Example of how to access the nested rewardsReceiptItemList
df_receipt_lines = df.select(
    "receipt_id",
    "purchaseDate_formatted",
    "totalSpent",
    "rewardsReceiptStatus",
    "purchasedItemCount",
    "pointsEarned",
    "bonusPointsEarned",
    "rewardsReceiptItemList"
).selectExpr("receipt_id", "purchaseDate_formatted", "totalSpent",
             "rewardsReceiptStatus", "purchasedItemCount", "pointsEarned",
             "bonusPointsEarned", "explode(rewardsReceiptItemList) as items")

# Show the results
df_receipt_lines.show(truncate=False)

+------------------------+----------------------+----------+--------------------+------------------+------------+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|receipt_id              |purchaseDate_formatted|totalSpent|rewardsReceiptStatus|purchasedItemCount|pointsEarned|bonusPointsEarned|items                                                                                                                                                                                             |
+------------------------+----------------------+----------+--------------------+------------------+------------+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|5ff1e1eb0a720f0523

In [28]:
# Register the DataFrame as a temporary view
df.createOrReplaceTempView("receipts")

# 1. Check for null values across all columns
null_check = spark.sql("""
    SELECT
        COUNT(*) as total_records,
        SUM(CASE WHEN receipt_id IS NULL THEN 1 ELSE 0 END) as null_receipt_ids,
        SUM(CASE WHEN userId IS NULL THEN 1 ELSE 0 END) as null_user_ids,
        SUM(CASE WHEN totalSpent IS NULL THEN 1 ELSE 0 END) as null_total_spent,
        SUM(CASE WHEN purchaseDate_formatted IS NULL THEN 1 ELSE 0 END) as null_purchase_dates,
        SUM(CASE WHEN rewardsReceiptStatus IS NULL THEN 1 ELSE 0 END) as null_status
    FROM receipts
""")

# 2. Check for data consistency - total spent vs items
price_consistency = spark.sql("""
    SELECT
        receipt_id,
        totalSpent,
        SUM(CAST(items.finalPrice AS DOUBLE)) as sum_item_prices,
        ABS(CAST(totalSpent AS DOUBLE) - SUM(CAST(items.finalPrice AS DOUBLE))) as price_difference
    FROM receipts
    LATERAL VIEW explode(rewardsReceiptItemList) as items
    GROUP BY receipt_id, totalSpent
    HAVING ABS(CAST(totalSpent AS DOUBLE) - SUM(CAST(items.finalPrice AS DOUBLE))) > 0.01
""")

# 3. Check for date validity
date_validity = spark.sql("""
    SELECT
        receipt_id,
        purchaseDate_formatted,
        createDate_formatted,
        pointsAwardedDate_formatted
    FROM receipts
    WHERE
        purchaseDate_formatted > createDate_formatted
        OR pointsAwardedDate_formatted < purchaseDate_formatted
""")

# 4. Check for duplicate receipts
duplicate_check = spark.sql("""
    SELECT
        receipt_id,
        COUNT(*) as occurrence_count
    FROM receipts
    GROUP BY receipt_id
    HAVING COUNT(*) > 1
""")

# 5. Check for unusual values in numeric fields
numeric_outliers = spark.sql("""
    WITH stats AS (
        SELECT
            AVG(CAST(totalSpent AS DOUBLE)) as avg_spent,
            STDDEV(CAST(totalSpent AS DOUBLE)) as stddev_spent,
            AVG(CAST(pointsEarned AS DOUBLE)) as avg_points,
            STDDEV(CAST(pointsEarned AS DOUBLE)) as stddev_points
        FROM receipts
    )
    SELECT
        receipt_id,
        totalSpent,
        pointsEarned
    FROM receipts, stats
    WHERE
        CAST(totalSpent AS DOUBLE) > stats.avg_spent + 3 * stats.stddev_spent
        OR CAST(pointsEarned AS DOUBLE) > stats.avg_points + 3 * stats.stddev_points
""")

# 6. Check for invalid status transitions
status_transitions = spark.sql("""
    SELECT
        r1.receipt_id,
        r1.rewardsReceiptStatus as current_status,
        r1.modifyDate_formatted,
        LAG(r1.rewardsReceiptStatus) OVER (PARTITION BY r1.receipt_id ORDER BY r1.modifyDate_formatted) as previous_status
    FROM receipts r1
    WHERE r1.rewardsReceiptStatus NOT IN ('FINISHED', 'PENDING', 'REJECTED')
""")

# 7. Check item count consistency
item_count_check = spark.sql("""
    SELECT
        receipt_id,
        purchasedItemCount as declared_count,
        COUNT(items) as actual_count
    FROM receipts
    LATERAL VIEW explode(rewardsReceiptItemList) as items
    GROUP BY receipt_id, purchasedItemCount
    HAVING purchasedItemCount != COUNT(items)
""")

# 8. Check for suspicious points allocation
points_check = spark.sql("""
    SELECT
        receipt_id,
        totalSpent,
        pointsEarned,
        bonusPointsEarned,
        CAST(pointsEarned AS DOUBLE) / CAST(totalSpent AS DOUBLE) as points_ratio
    FROM receipts
    WHERE
        CAST(totalSpent AS DOUBLE) > 0
        AND (
            CAST(pointsEarned AS DOUBLE) / CAST(totalSpent AS DOUBLE) > 100
            OR bonusPointsEarned > CAST(pointsEarned AS DOUBLE) * 2
        )
""")

# 9. Check for items with missing or invalid data
item_data_quality = spark.sql("""
    SELECT
        receipt_id,
        items.*
    FROM receipts
    LATERAL VIEW explode(rewardsReceiptItemList) as items
    WHERE
        items.barcode IS NULL
        OR items.finalPrice IS NULL
        OR CAST(items.finalPrice AS DOUBLE) <= 0
        OR items.quantityPurchased <= 0
""")

# Execute and display results
for query_name, df_query in [
    ("Null Check", null_check),
    ("Price Consistency", price_consistency),
    ("Date Validity", date_validity),
    ("Duplicates", duplicate_check),
    ("Numeric Outliers", numeric_outliers),
    ("Status Transitions", status_transitions),
    ("Item Count", item_count_check),
    ("Points Check", points_check),
    ("Item Quality", item_data_quality)
]:
    print(f"\n=== {query_name} ===")
    df_query.show(truncate=False)


=== Null Check ===
+-------------+----------------+-------------+----------------+-------------------+-----------+
|total_records|null_receipt_ids|null_user_ids|null_total_spent|null_purchase_dates|null_status|
+-------------+----------------+-------------+----------------+-------------------+-----------+
|1119         |0               |0            |435             |448                |0          |
+-------------+----------------+-------------+----------------+-------------------+-----------+


=== Price Consistency ===
+------------------------+----------+---------------+----------------+
|receipt_id              |totalSpent|sum_item_prices|price_difference|
+------------------------+----------+---------------+----------------+
|6011f39c0a720f05350000b4|1.00      |3.56           |2.56            |
|5ffc9d9c0a7214adca00004b|10.00     |24.0           |14.0            |
|602558a90a720f05a8000240|1.00      |3.56           |2.56            |
|5ff1e1d20a7214ada1000561|1.00      |3.56     

In [29]:
# Register the DataFrame as a temporary view
df_receipt_lines.createOrReplaceTempView("receipt_lines")

# 1. Check for missing or invalid values in critical fields
missing_values = spark.sql("""
    SELECT
        COUNT(*) as total_records,
        SUM(CASE WHEN receipt_id IS NULL THEN 1 ELSE 0 END) as null_receipt_ids,
        SUM(CASE WHEN items.barcode IS NULL THEN 1 ELSE 0 END) as null_barcodes,
        SUM(CASE WHEN items.finalPrice IS NULL OR TRIM(items.finalPrice) = '' THEN 1 ELSE 0 END) as null_prices,
        SUM(CASE WHEN items.quantityPurchased IS NULL THEN 1 ELSE 0 END) as null_quantities,
        SUM(CASE WHEN items.description IS NULL OR TRIM(items.description) = '' THEN 1 ELSE 0 END) as null_descriptions
    FROM receipt_lines
""")

# 2. Check for price consistency and negative values
price_issues = spark.sql("""
    SELECT
        receipt_id,
        items.barcode,
        items.finalPrice,
        items.itemPrice,
        items.quantityPurchased,
        CAST(items.finalPrice AS DOUBLE) as final_price_num,
        CAST(items.itemPrice AS DOUBLE) as item_price_num,
        ABS(CAST(items.finalPrice AS DOUBLE) - CAST(items.itemPrice AS DOUBLE)) as price_difference
    FROM receipt_lines
    WHERE
        CAST(items.finalPrice AS DOUBLE) <= 0
        OR CAST(items.itemPrice AS DOUBLE) <= 0
        OR CAST(items.quantityPurchased AS INT) <= 0
        OR ABS(CAST(items.finalPrice AS DOUBLE) - CAST(items.itemPrice AS DOUBLE)) > 0.01
""")

# 3. Check for receipt total consistency
total_consistency = spark.sql("""
    SELECT
        receipt_id,
        totalSpent as receipt_total,
        SUM(CAST(items.finalPrice AS DOUBLE)) as sum_line_items,
        COUNT(*) as line_item_count,
        purchasedItemCount as declared_item_count,
        ABS(CAST(totalSpent AS DOUBLE) - SUM(CAST(items.finalPrice AS DOUBLE))) as total_difference
    FROM receipt_lines
    GROUP BY receipt_id, totalSpent, purchasedItemCount
    HAVING
        ABS(CAST(totalSpent AS DOUBLE) - SUM(CAST(items.finalPrice AS DOUBLE))) > 0.01
        OR COUNT(*) != purchasedItemCount
""")

# 4. Check for duplicate line items within receipts
duplicate_items = spark.sql("""
    SELECT
        receipt_id,
        items.barcode,
        items.description,
        COUNT(*) as occurrence_count
    FROM receipt_lines
    GROUP BY receipt_id, items.barcode, items.description
    HAVING COUNT(*) > 1
""")

# 5. Check for unusual quantities or prices (statistical outliers)
outliers = spark.sql("""
    WITH stats AS (
        SELECT
            items.barcode,
            AVG(CAST(items.finalPrice AS DOUBLE)) as avg_price,
            STDDEV(CAST(items.finalPrice AS DOUBLE)) as stddev_price,
            AVG(items.quantityPurchased) as avg_quantity,
            STDDEV(items.quantityPurchased) as stddev_quantity
        FROM receipt_lines
        GROUP BY items.barcode
    )
    SELECT
        r.receipt_id,
        r.items.barcode,
        r.items.description,
        r.items.finalPrice,
        r.items.quantityPurchased,
        stats.avg_price,
        stats.avg_quantity
    FROM receipt_lines r
    JOIN stats ON r.items.barcode = stats.barcode
    WHERE
        ABS(CAST(r.items.finalPrice AS DOUBLE) - stats.avg_price) > 3 * stats.stddev_price
        OR ABS(r.items.quantityPurchased - stats.avg_quantity) > 3 * stats.stddev_quantity
""")

# 6. Check for mismatched user-flagged data
flagged_mismatches = spark.sql("""
    SELECT
        receipt_id,
        items.barcode,
        items.userFlaggedBarcode,
        items.finalPrice,
        items.userFlaggedPrice,
        items.quantityPurchased,
        items.userFlaggedQuantity
    FROM receipt_lines
    WHERE
        (items.barcode != items.userFlaggedBarcode AND items.userFlaggedBarcode IS NOT NULL)
        OR (CAST(items.finalPrice AS DOUBLE) != CAST(items.userFlaggedPrice AS DOUBLE) AND items.userFlaggedPrice IS NOT NULL)
        OR (items.quantityPurchased != items.userFlaggedQuantity AND items.userFlaggedQuantity IS NOT NULL)
""")

# 7. Check competitive product and rewards group patterns
rewards_analysis = spark.sql("""
    SELECT
        items.rewardsGroup,
        items.competitiveProduct,
        COUNT(*) as item_count,
        COUNT(DISTINCT receipt_id) as receipt_count,
        AVG(CAST(items.finalPrice AS DOUBLE)) as avg_price
    FROM receipt_lines
    WHERE items.rewardsGroup IS NOT NULL
        OR items.competitiveProduct IS NOT NULL
    GROUP BY items.rewardsGroup, items.competitiveProduct
""")

# Execute and display results
for query_name, df_query in [
    ("Missing Values Summary", missing_values),
    ("Price Issues", price_issues),
    ("Total Consistency Issues", total_consistency),
    ("Duplicate Items", duplicate_items),
    ("Statistical Outliers", outliers),
    ("Flagged Data Mismatches", flagged_mismatches),
    ("Rewards Group Analysis", rewards_analysis)
]:
    print(f"\n=== {query_name} ===")
    df_query.show(truncate=False)


=== Missing Values Summary ===
+-------------+----------------+-------------+-----------+---------------+-----------------+
|total_records|null_receipt_ids|null_barcodes|null_prices|null_quantities|null_descriptions|
+-------------+----------------+-------------+-----------+---------------+-----------------+
|6941         |0               |3851         |174        |174            |381              |
+-------------+----------------+-------------+-----------+---------------+-----------------+


=== Price Issues ===
+------------------------+-------+----------+---------+-----------------+---------------+--------------+-------------------+
|receipt_id              |barcode|finalPrice|itemPrice|quantityPurchased|final_price_num|item_price_num|price_difference   |
+------------------------+-------+----------+---------+-----------------+---------------+--------------+-------------------+
|600260210a720f05f300008f|NULL   |2.88      |4.99     |1                |2.88           |4.99          |2