In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, lit, coalesce

In [0]:
spark = SparkSession.builder.getOrCreate()

In [0]:
df = spark.read.table("workspace.google_drive.global_superstore")

In [0]:
# df.printSchema()

root
 |-- _line: long (nullable = true)
 |-- _fivetran_synced: timestamp (nullable = true)
 |-- row_id: long (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- discount: double (nullable = true)
 |-- ship_date: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- shipping_cost: double (nullable = true)
 |-- segment: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- ship_mode: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- profit: double (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- postal_code: long (nullable = true)
 |-- order_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- city: string (nullable = true)
 |-- market: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- state: string (nullable = true)
 |-- order_priority: string (nullable = true)
 |-- country: st

Delete records without Order ID(Noise)

In [0]:
df = df.dropna(subset=['order_id'], how='any')

Delete records without Product ID(Not relevant for Analysis)

In [0]:
df = df.dropna(subset=['product_id'], how='any')

Delete records without Customer ID(Not relevant for Analysis)

In [0]:
df = df.dropna(subset=['customer_id'], how='any')

Add Profitability column

In [0]:
# df.filter(df['profit'] < 0).count()

12544

In [0]:
df = df.withColumn("Profitability", when(df.profit < 0, "Loss").otherwise("Profit"))

In [0]:
# df.select('Profitability').show(3)

+-------------+
|Profitability|
+-------------+
|       Profit|
|       Profit|
|       Profit|
+-------------+
only showing top 3 rows


Postal code logic

In [0]:
df = df.withColumn("postal_code", when(col("postal_code").isNull(), lit(0)).otherwise(col("postal_code")))

Customer Name and Product Name Logic

In [0]:
lookup_df_customer = df.filter(col("customer_name").isNotNull()).select("customer_id", "customer_name").dropDuplicates(["customer_id"])

In [0]:
lookup_df_product = df.filter(col("product_name").isNotNull()).select("product_id", "product_name").dropDuplicates(["product_id"])

In [0]:
df = df.join(lookup_df_customer.withColumnRenamed("customer_name", "lookup_name"), on="customer_id", how="left")

In [0]:
df = df.withColumn("customer_name", coalesce(col("customer_name"), col("lookup_name"), lit("NO NAME"))).drop("lookup_name")

In [0]:
df = df.join(lookup_df_product.withColumnRenamed("product_name", "lookup_name"), on="product_id", how="left")

In [0]:
df = df.withColumn("product_name", coalesce(col("product_name"), col("lookup_name"), lit("NO NAME"))).drop("lookup_name")

In [0]:
# df.select("order_priority").distinct().show()

+--------------+
|order_priority|
+--------------+
|        Medium|
|          High|
|      Critical|
|           Low|
+--------------+



Set Priority to Medium if Priority is NULL

In [0]:
df = df.withColumn("order_priority", when(col("order_priority").isNull(), "Medium").otherwise(col("order_priority")))

Write the cleaned data into a table in sales schema

In [0]:
df.write.mode("append").saveAsTable("sales.global_superstore")