In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lit, when, to_timestamp
from pyspark.sql.types import StringType, TimestampType, DecimalType

In [0]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Fetch Rewards Data Transformation") \
    .getOrCreate()

In [0]:
# Load JSON files into DataFrames
users_df = spark.read.json("/FileStore/tables/DataFiles/users.json")
receipts_df = spark.read.json("/FileStore/tables/DataFiles/receipts.json")
brands_df = spark.read.json("/FileStore/tables/DataFiles/brands.json")

In [0]:
# Process Users Table
users_df = users_df.select(
    col("_id.$oid").alias("user_id").cast(StringType()),
    col("state"),
    to_timestamp((col("createdDate.$date") / 1000)).alias("createdDate"),  # Convert to proper timestamp
    to_timestamp((col("lastLogin.$date") / 1000)).alias("lastLogin"),  # Convert to proper timestamp
    col("role"),
    col("active"),
    col("signUpSource")
)

In [0]:
# Process Receipts Table
receipts_df = receipts_df.select(
    col("_id.$oid").alias("receipt_id").cast(StringType()),
    col("userId").alias("user_id").cast(StringType()),
    col("bonusPointsEarned"),
    col("bonusPointsEarnedReason"),
    to_timestamp((col("createDate.$date") / 1000)).alias("createDate"),  # Convert to proper timestamp
    to_timestamp((col("dateScanned.$date") / 1000)).alias("dateScanned"),  # Convert to proper timestamp
    to_timestamp((col("finishedDate.$date") / 1000)).alias("finishedDate"),  # Convert to proper timestamp
    to_timestamp((col("modifyDate.$date") / 1000)).alias("modifyDate"),  # Convert to proper timestamp
    to_timestamp((col("pointsAwardedDate.$date") / 1000)).alias("pointsAwardedDate"),  # Convert to proper timestamp
    col("pointsEarned").cast(DecimalType(10, 2)),
    to_timestamp((col("purchaseDate.$date") / 1000)).alias("purchaseDate"),  # Convert to proper timestamp
    col("purchasedItemCount"),
    col("rewardsReceiptStatus"),
    col("totalSpent").cast(DecimalType(10, 2)),
    col("rewardsReceiptItemList")
)

# Explode the nested rewardsReceiptItemList array
exploded_df = receipts_df.withColumn("receiptItem", explode(col("rewardsReceiptItemList")))
receipt_items_df = exploded_df.select(
    col("receipt_id"),
    col("receiptItem.barcode"),
    col("receiptItem.description"),
    col("receiptItem.finalPrice").cast(DecimalType(10, 2)).alias("finalPrice"),
    col("receiptItem.itemPrice").cast(DecimalType(10, 2)).alias("itemPrice"),
    col("receiptItem.needsFetchReview"),
    col("receiptItem.partnerItemId").cast(StringType()).alias("partnerItemId"),
    col("receiptItem.preventTargetGapPoints"),
    col("receiptItem.quantityPurchased"),
    col("receiptItem.userFlaggedBarcode"),
    col("receiptItem.userFlaggedNewItem"),
    col("receiptItem.userFlaggedPrice").cast(DecimalType(10, 2)).alias("userFlaggedPrice"),
    col("receiptItem.userFlaggedQuantity"),
    col("receiptItem.needsFetchReviewReason"),
    col("receiptItem.pointsNotAwardedReason"),
    col("receiptItem.pointsPayerId").cast(StringType()).alias("pointsPayerId"),
    col("receiptItem.rewardsGroup"),
    col("receiptItem.rewardsProductPartnerId").cast(StringType()).alias("rewardsProductPartnerId"),
    col("receiptItem.userFlaggedDescription")
).withColumn("pointsPayerId", when(col("pointsPayerId").isNull(), lit("-1")).otherwise(col("pointsPayerId")))


In [0]:
# Process Brands Table
brands_df = brands_df.select(
    col("_id.$oid").alias("brand_id").cast(StringType()),
    col("barcode"),
    col("brandCode"),
    col("category"),
    col("categoryCode"),
    col("cpg.$id.$oid").alias("cpgId").cast(StringType()),
    col("cpg.$ref").alias("cpgRef"),
    col("topBrand"),
    col("name")
)

In [0]:
# Add default entry for Brands table
default_brand_df = spark.createDataFrame(
    [( "default_brand_id", "", "DEFAULT_BRAND", "", "", "-1", "", False, "Default Brand")],
    ["brand_id", "barcode", "brandCode", "category", "categoryCode", "cpgId", "cpgRef", "topBrand", "name"]
)
brands_df = brands_df.union(default_brand_df)

In [0]:
# Print schemas to verify
users_df.printSchema()
receipts_df.printSchema()
brands_df.printSchema()
receipt_items_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- state: string (nullable = true)
 |-- createdDate: timestamp (nullable = true)
 |-- lastLogin: timestamp (nullable = true)
 |-- role: string (nullable = true)
 |-- active: boolean (nullable = true)
 |-- signUpSource: string (nullable = true)

root
 |-- receipt_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- bonusPointsEarned: long (nullable = true)
 |-- bonusPointsEarnedReason: string (nullable = true)
 |-- createDate: timestamp (nullable = true)
 |-- dateScanned: timestamp (nullable = true)
 |-- finishedDate: timestamp (nullable = true)
 |-- modifyDate: timestamp (nullable = true)
 |-- pointsAwardedDate: timestamp (nullable = true)
 |-- pointsEarned: decimal(10,2) (nullable = true)
 |-- purchaseDate: timestamp (nullable = true)
 |-- purchasedItemCount: long (nullable = true)
 |-- rewardsReceiptStatus: string (nullable = true)
 |-- totalSpent: decimal(10,2) (nullable = true)
 |-- rewardsReceiptItemList: array (nul

In [0]:
# Save DataFrames as CSV
users_df.write.mode("overwrite").csv("/dbfs/FileStore/fr1/users.csv", header=True)
receipts_df.select(
    col("receipt_id"),
    col("user_id"),
    col("bonusPointsEarned"),
    col("bonusPointsEarnedReason"),
    col("createDate"),
    col("dateScanned"),
    col("finishedDate"),
    col("modifyDate"),
    col("pointsAwardedDate"),
    col("pointsEarned"),
    col("purchaseDate"),
    col("purchasedItemCount"),
    col("rewardsReceiptStatus"),
    col("totalSpent")
).write.mode("overwrite").csv("/dbfs/FileStore/fr1/receipts.csv", header=True)
receipt_items_df.write.mode("overwrite").csv("/dbfs/FileStore/fr1/receiptitems.csv", header=True)
brands_df.select(
    col("brand_id"),
    col("barcode"),
    col("brandCode"),
    col("category"),
    col("categoryCode"),
    col("name"),
    col("topBrand"),
    col("cpgId"),
    col("cpgRef")
).write.mode("overwrite").csv("/dbfs/FileStore/fr1/brands.csv", header=True)