In [None]:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session

# Load from your raw zone S3 bucket
df = spark.read.option("header", "true").option("inferSchema", "true").csv("s3:/buket uri/file.csv")

print("✅ Raw data loaded successfully.")
df.show(5)


In [None]:
# Count total rows
total_rows = df.count()
print(f"Total Rows: {total_rows}")

# Count nulls per column
null_summary = df.select([count(when(col(c).isNull() | (col(c) == '') | isnan(c), c)).alias(c) for c in df.columns])
null_summary.show(truncate=False)

In [None]:
# Remove rows with no CustomerID
df = df.filter(col("CustomerID").isNotNull())


In [None]:
# Replace Null Description with Placeholder
df = df.fillna({'Description': 'Unknown Product'})


In [None]:
# Replace Missing Numeric Columns (optional)
df = df.fillna({'Quantity': 0, 'UnitPrice': 0})


In [None]:
# Verify Again After Cleaning
null_summary_after = df.select([count(when(col(c).isNull() | (col(c) == '') | isnan(c), c)).alias(c) for c in df.columns])
null_summary_after.show(truncate=False)


In [None]:
# Convert columns to correct data types for analysis.
# Data Standardization (Formatting & Consistency)
df = df.withColumn("Quantity", col("Quantity").cast("int")) \
       .withColumn("UnitPrice", col("UnitPrice").cast("double")) \
       .withColumn("CustomerID", col("CustomerID").cast("int")) 


In [None]:
# Standardize Text Columns (Trim, Uppercase, Remove Spaces)
from pyspark.sql.functions import trim, upper

df = df.withColumn("StockCode", trim(upper(col("StockCode")))) \
       .withColumn("Description", trim(upper(col("Description")))) \
       .withColumn("Country", trim(upper(col("Country"))))


In [None]:
# Remove Duplicates# Remove Duplicates
before = df.count()
df = df.dropDuplicates()
after = df.count()
print(f"Removed {before - after} duplicate rows.")


In [None]:
# Validate Cleaned Dataset
df.printSchema()
df.show(5)


In [None]:
# Save the Cleaned & Standardized Data to S3 (Parquet)
output_path = "s3:/buket uri"

df.write.mode("overwrite").parquet(output_path)

print(f"✅ Cleaned and standardized data written to {output_path}")
