Read from databricks_poc.bronze.users

Mask Name

Partially mask email so only the domain is visible

Group ages into ranges

Write to databricks_poc.silver.users

In [0]:
from pyspark.sql.functions import regexp_replace, udf, col
from pyspark.sql.types import StringType

# Helper functions

def mask_email(email):
    email.split("@")[0]
    return "******@" + email.split("@")[1]

mask_email_udf = udf(mask_email, StringType())


def group_age(age):
    if age < 18:
        return "under 18"
    elif age >= 18 and age < 30:
        return "18-29"
    elif age >= 30 and age < 65:
        return "30-49"
    elif age >= 65:
        return "65+"
    else:
        return "unknown"

group_age_udf = udf(group_age, StringType())

In [0]:
# Bronze to Silver transformations

spark.sql("USE CATALOG databricks_poc")
spark.sql("USE SCHEMA bronze")

user_bronze_df = spark.sql("SELECT * FROM users")

users_transform_df = (
    user_bronze_df
    .withColumn("name", regexp_replace("name", "^.*$", "******"))
    .withColumn("email", mask_email_udf(col("email")))
    .withColumn("age_group", group_age_udf(col("age").cast("int")))
    .drop("age")
)

users_transform_df.createOrReplaceTempView("users_silver_updates_temp_vw")

spark.sql("CREATE SCHEMA IF NOT EXISTS silver")
spark.sql("USE SCHEMA silver")
spark.sql("""
        CREATE TABLE IF NOT EXISTS users
        (user_id STRING, name STRING, email STRING, age_group STRING, gender STRING, signup_date STRING, country STRING, _rescued_data STRING)
          """
          )
spark.sql("""
        MERGE INTO  databricks_poc.silver.users u
        USING users_silver_updates_temp_vw t
        ON u.user_id = t.user_id
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
        """)


In [0]:
%sql

create or replace temp view transactions_enriched_temp_vw as 
SELECT
  t.user_id
  ,u.name
  ,u.email
  ,u.age_group
  ,u.gender
  ,u.signup_date
  ,u.country
  ,u._rescued_data

  ,t.transaction_id
  ,t.transaction_date
  ,t.amount
  ,t.category
  ,t.merchant
  ,t.payment_method
  ,t.credit_card_number

FROM databricks_poc_bigquery_catalog.databricks_poc.transactions t
LEFT JOIN databricks_poc.silver.users u
ON u.user_id = t.user_id;

CREATE OR REPLACE TABLE databricks_poc.silver.cube_user_transactions AS
select 
user_id
,age_group
,gender
,country
-- ,CONCAT(year(transaction_date), "-", month(transaction_date)) as year_month
,date_format(transaction_date, 'yyyy-MM') year_month
,merchant
,category
,payment_method
,sum(amount) as total_amount

from transactions_enriched_temp_vw
group by cube (1,2,3,4,5,6,7,8);
