In [0]:
%run ../utils

In [0]:
dbutils.widgets.text("adhoc_date", "")


In [0]:
adhoc_date = dbutils.widgets.get("adhoc_date")
# display(adhoc_date)

In [0]:
date = genrate_yesterdays_date(adhoc_date)
print(f"current job is running for the date: {date}")

# create dataframe on input dataset of customer profile
df_profile = spark.read.csv(f"/mnt/bronze-layer/{date}/customer_profile.csv", header=True)
print(df_profile.show())
print(df_profile.printSchema())


In [0]:
# Defined window specification to get unique record based on customer_id and signup_date
window_spec = Window.partitionBy("customer_id", "signup_date").orderBy("signup_date")

# Apply row_number function above window specification
df_profile = df_profile.withColumn("row_no", row_number().over(window_spec))

# Storing bad record from remove duplicate transformation
duplicate_bad_df = df_profile.filter("row_no != 1").drop("row_no")

df_profile = df_profile.filter("row_no = 1").drop("row_no")

# Standardize category and email values.
# While validating email, consider email domain ends with “.com” and “.net”.
email_bad_df = df_profile.filter((~col("email").endswith(".com")) & (~col("email").endswith(".net")))

df_profile = df_profile.filter((col("email").endswith(".com")) | (col("email").endswith(".net")))

print(df_profile.show())


In [0]:

df_profile = df_profile.withColumn("signup_date", to_date("signup_date"))
df_profile = df_profile.withColumn("signup_year", year(col("signup_date"))) \
    .withColumn("signup_month", month(col("signup_date"))) \
    .withColumn("signup_day", dayofmonth(col("signup_date")))
print(df_profile.show())

In [0]:

# Write data to silver layer
df_profile.coalesce(1).write.mode("overwrite").parquet(f"/mnt/silver-layer/{date}/customer_profile")

# Write bad records to silver layer
duplicate_bad_df.union(email_bad_df).write.mode("overwrite").csv(f"/mnt/silver-layer/bad_record/{date}/customer_profile", header=True)

# List files in the bronze layer for the given date
dbutils.fs.ls(f"/mnt/bronze-layer/{date}/customer_profile.csv")
