In [0]:
df = spark.read.format("delta").table("workspace.bronze.crm_prd_info")

In [0]:
display(df)

In [0]:
from pyspark.sql.functions import substring

df = df.withColumn("cut_id", substring("prd_key", 1, 5))
cols = df.columns
prd_key_idx = cols.index("prd_key")
# Remove 'cut_id' from the end if it exists
if cols[-1] == "cut_id":
    cols = cols[:-1]
new_cols = cols[:prd_key_idx] + ["cut_id"] + cols[prd_key_idx:]
df = df.select(*new_cols)


In [0]:
from pyspark.sql.functions import length

df = df.withColumn("prd_key", substring("prd_key", 7, length("prd_key")))

In [0]:
from pyspark.sql.functions import when

df = df.withColumn("prd_cost", when(df["prd_cost"].isNull(), 0).otherwise(df["prd_cost"]))


In [0]:
from pyspark.sql.functions import upper, trim

df = df.withColumn(
    "prd_line",
    when(upper(trim(df["prd_line"])) == "R", "road")
    .when(upper(trim(df["prd_line"])) == "M", "mountain")
    .when(upper(trim(df["prd_line"])) == "S", "touring")
    .when(upper(trim(df["prd_line"])) == "O", "other sales")
    .otherwise("n/a")
)


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead

w = Window.partitionBy("prd_key").orderBy("prd_start_dt")
df = df.withColumn("prd_end_dt", lead("prd_end_dt").over(w))

In [0]:
df.write.mode("overwrite").saveAsTable("silver.crm_prd_info")