In [0]:
from pyspark.sql import functions as F

# Load bronze table
bronze_df = spark.table("etl_catalog.etl_db.remoteok_bronze")

# Transformations
silver_df = bronze_df \
    .withColumn("date", F.to_timestamp("date", "yyyy-MM-dd'T'HH:mm:ssXXX")) \
    .withColumn("salary_min", F.col("salary_min").cast("double")) \
    .withColumn("salary_max", F.col("salary_max").cast("double")) \
    .withColumn("tags", F.concat_ws(", ", "tags")) \
    .withColumn("company", F.when(F.col("company").isNull() | (F.col("company") == ""), "Unknown")
                             .otherwise(F.col("company"))) \
    .withColumn("position", F.when(F.col("position").isNull() | (F.col("position") == ""), "Unknown")
                              .otherwise(F.col("position"))) \
    .withColumn("location", F.when(F.col("location").isNull() | (F.col("location") == ""), "Remote")
                              .otherwise(F.col("location"))) \
    .withColumn("job_age_days", F.datediff(F.current_date(), F.col("date"))) \
    .withColumn("salary_range", F.col("salary_max") - F.col("salary_min")) \
    .drop("logo", "apply_url", "epoch")

# Reorder columns so id is first
cols = ["id"] + [c for c in silver_df.columns if c != "id"]
silver_df = silver_df.select(cols)

# Save to silver table
silver_df.write.format("delta").mode("overwrite").saveAsTable("etl_catalog.etl_db.remoteok_silver")

print("Silver table created: etl_db.remoteok_silver")


In [0]:
%sql
Select * from etl_catalog.etl_db.remoteok_silver order by id limit 5

In [0]:
%sql
describe etl_catalog.etl_db.remoteok_silver