# source_crm

##crm_cust_info

###Reading From The bronze  Delta Table

In [0]:
df=spark.table("workspace.bronze.crm_cust_info")


###Data Transformation on bronze Delta Table

Trimming

In [0]:


import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, col


for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))
     



Normalization

In [0]:

df = (
    df
    .withColumn(
        "cst_marital_status",
        F.when(F.upper(F.col("cst_marital_status")) == "S", "Single")
         .when(F.upper(F.col("cst_marital_status")) == "M", "Married")
         .otherwise("n/a")
    )
    .withColumn(
        "cst_gndr",
        F.when(F.upper(F.col("cst_gndr")) == "F", "Female")
         .when(F.upper(F.col("cst_gndr")) == "M", "Male")
         .otherwise("n/a")
    )
)
     


Remove Records with Missing Customer ID

In [0]:


df = df.filter(col("cst_id").isNotNull())
     


Renaming Columns

In [0]:


RENAME_MAP = {
    "cst_id": "customer_id",
    "cst_key": "customer_number",
    "cst_firstname": "first_name",
    "cst_lastname": "last_name",
    "cst_marital_status": "marital_status",
    "cst_gndr": "gender",
    "cst_create_date": "created_date"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)
     


Sanity checks of dataframe

In [0]:
df.limit(10).display()

###Write Into Silver Delta Table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_cust_info")
 


##crm_prd_info

###Read From Bronze Delta table

In [0]:
df = spark.table("workspace.bronze.crm_prd_info")

###Data Transformation on bronze Delta Table

Trimming

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, col


for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

Product Key Parsing

In [0]:

df = df.withColumn("cat_id", F.regexp_replace(F.substring(col("prd_key"), 1, 5), "-", "_"))
df = df.withColumn("prd_key", F.substring(col("prd_key"), 7, F.length(col("prd_key"))))

Cost Cleanup

In [0]:
df = df.withColumn("prd_cost", F.coalesce(col("prd_cost"), F.lit(0)))

Product Line Normalization

In [0]:

df = (
    df
    # Normalize product line
    .withColumn(
        "prd_line",
        F.when(F.upper(col("prd_line")) == "M", "Mountain")
         .when(F.upper(col("prd_line")) == "R", "Road")
         .when(F.upper(col("prd_line")) == "S", "Other Sales")
         .when(F.upper(col("prd_line")) == "T", "Touring")
         .otherwise("n/a")
    )
)
     


Date Casting

In [0]:
from pyspark.sql.types import DateType

df = df.withColumn("prd_start_dt", col("prd_start_dt").cast(DateType()))

Renaming Columns

In [0]:

RENAME_MAP = {
    "prd_id": "product_id",
    "cat_id": "category_id",
    "prd_key": "product_number",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "start_date",
    "prd_end_dt": "end_date"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)
     

Sanity checks of dataframe

In [0]:
df.limit(10).display()
     

###Writing Silver Table

In [0]:


df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_products")
     


##crm_sales_details

### Read From Bronze Delta table

In [0]:
df = spark.table("workspace.bronze.crm_sales_details")

### Data Transformation On Bromze Delta Table

Trimming

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, length

df = (
    df
    .withColumn(
        "sls_order_dt",
        F.when(
            (col("sls_order_dt") == 0) | (length(col("sls_order_dt")) != 8),
            None
        ).otherwise(F.to_date(col("sls_order_dt").cast("string"), "yyyyMMdd"))
    )
    .withColumn(
        "sls_ship_dt",
        F.when(
            (col("sls_ship_dt") == 0) | (length(col("sls_ship_dt")) != 8),
            None
        ).otherwise(F.to_date(col("sls_ship_dt").cast("string"), "yyyyMMdd"))
    )
    .withColumn(
        "sls_due_dt",
        F.when(
            (col("sls_due_dt") == 0) | (length(col("sls_due_dt")) != 8),
            None
        ).otherwise(F.to_date(col("sls_due_dt").cast("string"), "yyyyMMdd"))
    )
)

Sales and Price Corrections

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, length
df = (
    df
    .withColumn(
        "sls_price",
        F.when(
            (col("sls_price").isNull()) | (col("sls_price") <= 0),
            F.when(
                col("sls_quantity") != 0,
                col("sls_sales") / col("sls_quantity")
            ).otherwise(None)
        ).otherwise(col("sls_price"))
    )
)

     


Renaming Columns

In [0]:

RENAME_MAP = {
    "sls_ord_num": "order_number",
    "sls_prd_key": "product_number",
    "sls_cust_id": "customer_id",
    "sls_order_dt": "order_date",
    "sls_ship_dt": "ship_date",
    "sls_due_dt": "due_date",
    "sls_sales": "sales_amount",
    "sls_quantity": "quantity",
    "sls_price": "price"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)
     


Sanity checks of dataframe

In [0]:


df.limit(10).display()
     


###Writing Silver Table

In [0]:


df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_sales")
     


#soruce_erp

## erp_cust_az12

### Read Data From Bronze  Delta table

In [0]:
df = spark.table("workspace.bronze.erp_cust_az12")

### Silver Transformations

Trimming

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, col

for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

Customer ID Cleanup

In [0]:

df = df.withColumn(
    "cid",
    F.when(col("cid").startswith("NAS"),
           F.substring(col("cid"), 4, F.length(col("cid"))))
     .otherwise(col("cid"))
)

Birthdate Validation

In [0]:
df = df.withColumn(
    "bdate",
    F.when(col("bdate") > F.current_date(), None)
     .otherwise(col("bdate"))
)
    

Gender Normalization

In [0]:

df = df.withColumn(
    "gen",
    F.when(F.upper(col("gen")).isin("F", "FEMALE"), "Female")
     .when(F.upper(col("gen")).isin("M", "MALE"), "Male")
     .otherwise("n/a")
)

Renaming Columns

In [0]:

RENAME_MAP = {
    "cid": "customer_number",
    "bdate": "birth_date",
    "gen": "gender"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

Sanity checks of dataframe

In [0]:


df.limit(10).display()
     


### Writing Silver Table

In [0]:

df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.erp_customers")

## erp_loc_a101

### Read Data From Bronze Delta table

In [0]:

df = spark.table("workspace.bronze.erp_loc_a101")

### Silver Transformations

Trimming

In [0]:

for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

Customer ID Cleanup

In [0]:
df = df.withColumn("cid", F.regexp_replace(col("cid"), "-", ""))

Country Normalization


In [0]:
df = df.withColumn(
    "cntry",
    F.when(col("cntry") == "DE", "Germany")
     .when(col("cntry").isin("US", "USA"), "United States")
     .when((col("cntry") == "") | col("cntry").isNull(), "n/a")
     .otherwise(col("cntry"))
)

Renaming Columns

In [0]:


RENAME_MAP = {
    "cid": "customer_number",
    "cntry": "country"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)
     


Sanity checks of dataframe

In [0]:

df.limit(10).display()

### Writing Silver Table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.erp_customer_location")

## erp_px_cat_g1v2

### Read Data From Bronze Delta table

In [0]:

df = spark.table("workspace.bronze.erp_px_cat_g1v2")

### Silver Transformations

Trimming

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

Normalize Maintenance Flag to Boolean

In [0]:


df = df.withColumn(
    "maintenance",
    F.when(F.upper(col("maintenance")) == "YES", F.lit(True))
     .when(F.upper(col("maintenance")) == "NO", F.lit(False))
     .otherwise(None)
)
  


Renaming Columns

In [0]:

RENAME_MAP = {
    "id": "category_id",
    "cat": "category",
    "subcat": "subcategory",
    "maintenance": "maintenance_flag"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

Sanity checks of dataframe

In [0]:

df.limit(10).display()

### Writing Silver Table


In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.erp_product_category")