In [0]:
df_erp_loc = spark.table(
    "bronze.erp_loc_a101"
)

In [0]:
df_erp_loc.display()

- Check the schema

In [0]:
df_erp_loc.printSchema()

- Rename Column Names

In [0]:
mapping_column_names = {
  'CID': 'customer_id_prefix',
  'CNTRY': 'country'
}

In [0]:
# create a function that renames the column name

def rename_column(dataframe, old_column, new_column):
    return dataframe.withColumnRenamed(
        old_column, new_column
    )

In [0]:
# update the column names

for old_col, new_col in mapping_column_names.items():
    df_erp_loc = rename_column(dataframe=df_erp_loc, old_column=old_col, new_column=new_col)

In [0]:
print(df_erp_loc.columns)

In [0]:
df_erp_loc.display()

- Add new customer_id column, containing only the numerical part of the customer_id_prefix column

In [0]:
from pyspark.sql.functions import regexp_replace, col

df_erp_loc = df_erp_loc.withColumn("customer_id", regexp_replace(col("customer_id_prefix"), r"[^0-9]", ""))

In [0]:
df_erp_loc.display()

- Find Duplicates in customer_id column

In [0]:
df_erp_loc.groupBy("customer_id").count().filter(
  col("count") > 1
).display()

In [0]:
df_erp_loc.display()

- check country names

In [0]:
df_erp_loc.groupBy("country").count().display()

- Map country names , to the correct one

In [0]:
# for United States

from pyspark.sql.functions import col, when

df_erp_loc = df_erp_loc.withColumn(
  'country',
  when(
    (col("country") == 'US') | (col("country") == 'USA'), 'United States'
  ).otherwise(col("country")
  ))


In [0]:
df_erp_loc.groupBy("country").count().display()

In [0]:
# for Germany

df_erp_loc = df_erp_loc.withColumn(
    'country',
    when(col("country") == 'DE', 'Germany').otherwise(col("country"))
)

In [0]:
df_erp_loc.groupBy("country").count().display()

In [0]:
df_erp_loc.display()

- Trim columns

In [0]:
# remove extra spaces using trim function
from pyspark.sql.functions import trim, col

for idx, column in enumerate(df_erp_loc.columns):
    if df_erp_loc.dtypes[idx][1] == 'string':
        df_erp_customers = df_erp_loc.withColumn(
            column, trim(col(column))
        )

In [0]:
df_erp_loc.display()

- Check for Nulls

In [0]:
from pyspark.sql.functions import col,isnan, when, count

df_erp_loc.select([count(when(col(c).isNull(), c)).alias(c) for c in df_erp_loc.columns]
   ).display()


In [0]:
%sql
 SELECT *
 FROM bronze.erp_loc_a101
 WHERE CNTRY IS NULL;

In [0]:
# convert null values from country column to unknown

df_erp_loc = df_erp_loc.withColumn(
  'country',
  when(
    col("country").isNull(), "unknown"
  ).otherwise(col("country"))
)

In [0]:
from pyspark.sql.functions import col,isnan, when, count

df_erp_loc.select([count(when(col(c).isNull(), c)).alias(c) for c in df_erp_loc.columns]
   ).display()


In [0]:
df_erp_loc.display()

- Store it as silver table

In [0]:

df_erp_loc.write.format("delta").mode("overwrite").saveAsTable("silver.erp_loc_a101")