In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, datediff, current_date, lit, current_timestamp

# Initialize SparkSession if not already available
spark = SparkSession.builder.appName("Users Transform").getOrCreate()

# Define Unity Catalog catalog and schema names
# IMPORTANT: Replace 'your_catalog' and 'your_schema' with your actual desired Unity Catalog catalog and schema names.
catalog_name = "main_catalog"
schema_name = "silver_users_schema"

# Create catalog and schema if they don't exist
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
spark.sql(f"USE SCHEMA {schema_name}")

# Define the Unity Catalog input and output table names
input_table_name = "main_catalog.bronze_users_schema.users_raw"
output_table_name = f"{catalog_name}.{schema_name}.users_transformed"

# Read the raw data from the Unity Catalog table
df = spark.read.table(input_table_name)

# --- CORRECTED TRANSFORMATIONS ---
# The raw DataFrame 'df' has columns:
# user_id, name, email, address, created_at

transformed_df = df.withColumn(
    # 1. ใช้คอลัมน์ 'created_at' เพื่อสร้าง registration_ts
    "registration_ts", to_timestamp("created_at") # Spark สามารถ parse ISO 8601 format ได้โดยตรง
).withColumn(
    # 2. คำนวณอายุการเป็นสมาชิก (tenure) จาก created_at
    "days_as_member", datediff(current_date(), col("created_at"))
).withColumn(
    # 3. เพิ่ม metadata column สำหรับ load timestamp
    "_load_timestamp", current_timestamp()
).select(
    # 4. เลือกและจัดเรียงคอลัมน์สำหรับตาราง Silver
    col("user_id"),
    col("name").alias("full_name"), # เปลี่ยนชื่อคอลัมน์ 'name' เป็น 'full_name'
    col("email"),
    col("days_as_member"),
    col("registration_ts"),
    col("_load_timestamp")
)

# Write the transformed data to the Unity Catalog silver table
transformed_df.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(output_table_name)

print(f"Successfully transformed data and saved to {output_table_name}")

# Display a sample of the transformed data
display(spark.read.table(output_table_name))