In [2]:
!pip install pyspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, count, to_date, length, expr, size, when, from_unixtime
from pyspark.sql.types import *

# Start Spark session
spark = SparkSession.builder \
    .appName("DataQualityEvaluation") \
    .getOrCreate()


In [None]:
# Load users JSON
raw_users_df = spark.read.option("multiline", "false").json("users.json")

# Check schema & count
raw_users_df.printSchema()
print(f"Total users: {raw_users_df.count()}")

users_df = raw_users_df.select(
    col("_id.$oid").alias("user_id"),
    col("active"),
    from_unixtime(col("createdDate.$date") / 1000).cast("timestamp").alias("createdDate"),
    from_unixtime(col("lastLogin.$date") / 1000).cast("timestamp").alias("lastLogin"),
    col("role"),
    col("signUpSource"),
    col("state")
)

# Null checks
users_df.select([count(when(col(c).isNull(), c)).alias(c) for c in users_df.columns]).show()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- active: boolean (nullable = true)
 |-- createdDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- lastLogin: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- role: string (nullable = true)
 |-- signUpSource: string (nullable = true)
 |-- state: string (nullable = true)

Total users: 495
+-------+------+-----------+---------+----+------------+-----+
|user_id|active|createdDate|lastLogin|role|signUpSource|state|
+-------+------+-----------+---------+----+------------+-----+
|      0|     0|          0|       62|   0|          48|   56|
+-------+------+-----------+---------+----+------------+-----+



In [27]:
# Load and flatten receipts
receipts_df = spark.read.option("multiline", "false").json("receipts.json")

receipts_flat = receipts_df.select(
    col("_id.$oid").alias("receipt_id"),
    col("userId").alias("user_id"),
    col("bonusPointsEarned"),
    col("bonusPointsEarnedReason"),
    from_unixtime(col("createDate.$date")/1000).cast("timestamp").alias("create_date"),
    from_unixtime(col("dateScanned.$date")/1000).cast("timestamp").alias("date_scanned"),
    from_unixtime(col("finishedDate.$date")/1000).cast("timestamp").alias("finished_date"),
    from_unixtime(col("modifyDate.$date")/1000).cast("timestamp").alias("modify_date"),
    from_unixtime(col("pointsAwardedDate.$date")/1000).cast("timestamp").alias("points_awarded_date"),
    col("pointsEarned"),
    from_unixtime(col("purchaseDate.$date")/1000).cast("timestamp").alias("purchase_date"),
    col("purchasedItemCount"),
    col("rewardsReceiptStatus"),
    col("totalSpent"),
    col("rewardsReceiptItemList")
)

# Data Quality Checks
print(f"Total receipts: {receipts_flat.count()}")

# Null/invalid checks
receipts_flat.select([count(when(col(c).isNull(), c)).alias(c) for c in receipts_flat.columns]).show()

# Negative or null spending
receipts_flat.filter((col("totalSpent") <= 0) | col("totalSpent").isNull()).show()

# Date logic errors
receipts_flat.filter(col("date_scanned") < col("purchase_date")).show()

# Rewards receipt statuses
receipts_flat.groupBy("rewardsReceiptStatus").count().show()

# Receipt item list issue
receipts_flat.select(size("rewardsReceiptItemList").alias("item_count")).groupBy("item_count").count().orderBy("item_count").show()


Total receipts: 1119
+----------+-------+-----------------+-----------------------+-----------+------------+-------------+-----------+-------------------+------------+-------------+------------------+--------------------+----------+----------------------+
|receipt_id|user_id|bonusPointsEarned|bonusPointsEarnedReason|create_date|date_scanned|finished_date|modify_date|points_awarded_date|pointsEarned|purchase_date|purchasedItemCount|rewardsReceiptStatus|totalSpent|rewardsReceiptItemList|
+----------+-------+-----------------+-----------------------+-----------+------------+-------------+-----------+-------------------+------------+-------------+------------------+--------------------+----------+----------------------+
|         0|      0|              575|                    575|          0|           0|          551|          0|                582|         510|          448|               484|                   0|       435|                   440|
+----------+-------+-----------------+-

In [35]:
# Load and flatten brands
brands_df = spark.read.option("multiline", "false").json("brands.json")

brands_flat = brands_df.select(
    col("_id.$oid").alias("brand_id"),
    col("barcode"),
    col("brandCode"),
    col("category"),
    col("categoryCode"),
    col("cpg.$id.$oid").alias("cpg_id"),
    col("topBrand"),
    col("name")
)

# Data Quality Checks
print(f"Total brands: {brands_flat.count()}")

# Null checks
brands_flat.select([count(when(col(c).isNull(), c)).alias(c) for c in brands_flat.columns]).show()

# Top brand inconsistency
brands_flat.groupBy("topBrand").count().show()

# Name quality
brands_flat.filter((col("name").isNull()) | (col("name") == "")).show()

# Duplicates by brand_id
brands_flat.groupBy("brand_id").count().filter("count > 1").show()


Total brands: 1167
+--------+-------+---------+--------+------------+------+--------+----+
|brand_id|barcode|brandCode|category|categoryCode|cpg_id|topBrand|name|
+--------+-------+---------+--------+------------+------+--------+----+
|       0|      0|      234|     155|         650|     0|     612|   0|
+--------+-------+---------+--------+------------+------+--------+----+

+--------+-----+
|topBrand|count|
+--------+-----+
|    NULL|  612|
|    true|   31|
|   false|  524|
+--------+-----+

+--------+-------+---------+--------+------------+------+--------+----+
|brand_id|barcode|brandCode|category|categoryCode|cpg_id|topBrand|name|
+--------+-------+---------+--------+------------+------+--------+----+
+--------+-------+---------+--------+------------+------+--------+----+

+--------+-----+
|brand_id|count|
+--------+-----+
+--------+-----+

