In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from functools import reduce

### **Bronze Layer**

In [0]:
# Bronze Layer 

user_schema = StructType([
    StructField("average_stars", DoubleType(), True),
    StructField("compliment_cool", LongType(), True),
    StructField("compliment_cute", LongType(), True),
    StructField("compliment_funny", LongType(), True),
    StructField("compliment_hot", LongType(), True),
    StructField("compliment_list", LongType(), True),
    StructField("compliment_more", LongType(), True),
    StructField("compliment_note", LongType(), True),
    StructField("compliment_photos", LongType(), True),
    StructField("compliment_plain", LongType(), True),
    StructField("compliment_profile", LongType(), True),
    StructField("compliment_writer", LongType(), True),
    StructField("cool", LongType(), True),
    StructField("elite", StringType(), True),
    StructField("fans", LongType(), True),
    StructField("friends", StringType(), True),
    StructField("name", StringType(),True),
    StructField("review_count", LongType(), True),
    StructField("useful", LongType(), True),
    StructField("user_id", StringType()),
    StructField("yelping_since", StringType())
  ])

user_bronze = (
    spark.read
    .schema(user_schema)
    .json("/Volumes/workspace/default/yelp-reviews/user_*.json")
    .withColumn("_ingest_ts", current_timestamp())  # optional for dedup recency
)
user_bronze.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.saveAsTable("bronzeuser")


Silver Layer

In [0]:
from pyspark.sql import *

src = spark.table("bronzeuser")
print("Row count:", src.count())
display(src)

Data Exploration 

In [0]:
#Data Schema 
src.printSchema()

In [0]:
# changing column types 
src= src.withColumn(
    "yelping_since_ts", 
    to_timestamp(col("yelping_since"), "yyyy-MM-dd HH:mm:ss")
)

In [0]:
# normalising arrays
from pyspark.sql.functions import when, split, trim, size, expr

# normalize friends: convert "None" or "" to empty array
src= src.withColumn("friends_arr",
                   when((col("friends").isNull()) | (col("friends") == "None") | (col("friends") == ""), expr("array()"))
                   .otherwise(split(col("friends"), r",\s*")))

# trim spaces and drop empty strings in friends_arr
from pyspark.sql.functions import transform, filter
src = src.withColumn("friends_arr", transform("friends_arr", lambda x: trim(x)))
src = src.withColumn("friends_arr", filter("friends_arr", lambda x: (x != "") & (x.isNotNull())) )
src = src.withColumn("friend_count", size("friends_arr"))

# normalize elite:
src = src.withColumn("elite_arr",
                   when((col("elite").isNull()) | (col("elite") == "None") | (col("elite") == ""), expr("array()"))
                   .otherwise(split(col("elite"), r",\s*")))
src = src.withColumn("elite_count", size("elite_arr"))
src = src.withColumn("elite_arr", expr("transform(elite_arr, x -> trim(x))"))
src = src.withColumn("elite_arr", expr("filter(elite_arr, x -> x IS NOT NULL AND x != '')"))
src = src.withColumn("elite_count", size(col("elite_arr")))


In [0]:
 #Check for nulls 
from pyspark.sql.functions import col, sum

# This creates a count of how many nulls exist in your new column
null_count = src.select(
    sum(col("yelping_since_ts").isNull().cast("int")).alias("null_timestamps")
)

null_count.show()

In [0]:
display(src)

In [0]:
#drop cleaned columns : 
src = src.drop("friends", "elite")

In [0]:
from pyspark.sql.functions import col

# Count rows and duplicates
total_rows = src.count()
distinct_users = src.select("user_id").distinct().count()
duplicates = total_rows - distinct_users

# Null counts per column (safe for types)
null_counts = {
    c: src.filter(
        col(c).isNull() | (
            (col(c) == "") if dict(src.dtypes)[c] == "string" else False
        )
    ).count()
    for c in src.columns
}

print(f"Total rows: {total_rows}")
print(f"Distinct user_id: {distinct_users}")
print(f"Duplicates: {duplicates}")

In [0]:
print(src.columns)
print("Null / empty counts :")
for k in src.columns:
    if k in null_counts:
        print(f"  {k}: {null_counts[k]}")
#  drop duplicate user_id rows, keep first occurrence
src= src.dropDuplicates(["user_id"])

In [0]:
negatives = src.filter(
    (col("review_count") < 0) |
    (col("fans") < 0) |
    (col("useful") < 0) |
    (col("funny") < 0) |
    (col("cool") < 0)
).limit(5)
neg_count = negatives.count()

invalid_stars = src.filter((col("average_stars") < 0) | (col("average_stars") > 5)).limit(5)

In [0]:
# Cell: outlier detection (percentiles) for numeric columns
numeric_cols = ['average_stars', 'compliment_cool', 'compliment_cute', 'compliment_funny', 'compliment_hot', 'compliment_list', 'compliment_more', 'compliment_note', 'compliment_photos', 'compliment_plain', 'compliment_profile', 'compliment_writer', 'cool', 'fans', 'funny', 'review_count', 'useful', 'friend_count', 'elite_count']
quantiles = {}
for c in numeric_cols:
    try:
        quantiles[c] = src.approxQuantile(c, [0.5, 0.9, 0.99, 0.999], 0.001)
    except Exception as e:
        quantiles[c] = str(e)
print("Quantiles (median, 90, 99, 99.9):")
for k, v in quantiles.items():
    print(f"  {k}: {v}")

In [0]:
#Silver 
src.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("silveruser")