In [0]:
# Load the Delta file from DBFS
df = spark.read.format("delta").load("/FileStore/delta/superstore_raw")
df.show(5)  # Display the first few rows to confirm it's loaded correctly


+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+
|Row_ID|      Order_ID|Order_Date| Ship_Date|     Ship_Mode|Customer_ID|  Customer_Name|  Segment|      Country|           City|     State|Postal_Code|Region|     Product_ID|       Category|Sub-Category|        Product_Name|   Sales|
+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+
|     1|CA-2017-152156|2017-11-08|2017-11-11|  Second Class|   CG-12520|    Claire Gute| Consumer|United States|      Henderson|  Kentucky|      42420| South|FUR-BO-10001798|      Furniture|   Bookcases|Bush Somerset Col...|  261.96|
|     2|CA-2017-152156|2017-11-08|2017-11-11|  Second Class|   C

In [0]:
# Count null values in each column and show the result without assignment
# Create a DataFrame with null count for each column

from pyspark.sql.functions import col, isnan, count, when
from pyspark.sql.functions import col, isnan, count, when

# Load the Delta file from DBFS
df = spark.read.format("delta").load("/FileStore/delta/superstore_raw")

# Display initial rows to confirm correct loading
df.show(5)

# Get column types to differentiate numeric and non-numeric columns
numeric_cols = [c for c, t in df.dtypes if t in ('double', 'float', 'int')]
non_numeric_cols = [c for c, t in df.dtypes if t not in ('double', 'float', 'int')]

# Count nulls in numeric columns (using isnan and isNull)
numeric_null_counts = [
    count(when(isnan(col(c)) | col(c).isNull(), c)).alias(c)
    for c in numeric_cols
]

# Count nulls in non-numeric columns (using only isNull)
non_numeric_null_counts = [
    count(when(col(c).isNull(), c)).alias(c)
    for c in non_numeric_cols
]

# Combine the null count checks and display them
df.select(numeric_null_counts + non_numeric_null_counts).show()



+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+
|Row_ID|      Order_ID|Order_Date| Ship_Date|     Ship_Mode|Customer_ID|  Customer_Name|  Segment|      Country|           City|     State|Postal_Code|Region|     Product_ID|       Category|Sub-Category|        Product_Name|   Sales|
+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+
|     1|CA-2017-152156|2017-11-08|2017-11-11|  Second Class|   CG-12520|    Claire Gute| Consumer|United States|      Henderson|  Kentucky|      42420| South|FUR-BO-10001798|      Furniture|   Bookcases|Bush Somerset Col...|  261.96|
|     2|CA-2017-152156|2017-11-08|2017-11-11|  Second Class|   C

In [0]:
# Clean the data by filling null values in existing numeric columns
# For instance, here we're filling 'Sales' with 0, but adjust based on your data inspection
df_cleaned = df.fillna({'Sales': 0})

# Drop rows with nulls in critical identifier columns
df_cleaned = df_cleaned.dropna(subset=['Customer_ID', 'Product_ID'])

# Save the cleaned DataFrame to Delta format
df_cleaned.write.format("delta").mode("overwrite").save("/FileStore/delta/superstore_cleaned")


In [0]:
# Drop duplicate rows
df = df.dropDuplicates()


In [0]:
from pyspark.sql.functions import to_date

# Convert 'Order Date' and 'Ship Date' to DateType
df = df.withColumn('Order_Date', to_date(col('Order_Date'), 'MM/dd/yyyy'))
df = df.withColumn('Ship_Date', to_date(col('Ship_Date'), 'MM/dd/yyyy'))


In [0]:
df_cleaned = df.fillna({'Sales': 0})

# Drop rows with nulls in critical identifier columns
df_cleaned = df_cleaned.dropna(subset=['Customer_ID', 'Product_ID'])

# Save the cleaned DataFrame to Delta format
df_cleaned.write.format("delta").mode("overwrite").save("/FileStore/delta/superstore_cleaned")
