In [0]:
%pip install openpyxl --upgrade
dbutils.library.restartPython()

In [0]:
import pandas as pd

# Load dataset from UCI public URL
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx"
df_pd = pd.read_excel(url, engine='openpyxl', dtype=str)

# Convert to Spark DataFrame
df_spark = spark.createDataFrame(df_pd)

# Save to Bronze Zone in ADLS
#df_spark.write.format("delta").mode("overwrite").save("abfss://bronze@straccountretail.dfs.core.windows.net/online_retail")
df_spark.write.format("delta").mode("overwrite").save("abfss://bronze@rgdbronzestretail0001.dfs.core.windows.net/online_retail")



In [0]:
from pyspark.sql.functions import col, to_date, expr

bronze_path = "abfss://bronze@rgdbronzestretail0001.dfs.core.windows.net/online_retail"
silver_path = "abfss://silver@rgdbronzestretail0001.dfs.core.windows.net/online_retail_clean"

bronze_df = spark.read.format("delta").load(bronze_path)

silver_df = (
    bronze_df
    .withColumn("StockCode", expr("try_cast(StockCode as int)"))
    .filter(col("InvoiceNo").isNotNull() & col("StockCode").isNotNull())
    .filter(col("Quantity") > 0)
    .withColumn("InvoiceDate", to_date(col("InvoiceDate")))
    .withColumn("Quantity", col("Quantity").cast("int"))
    .withColumn("UnitPrice", col("UnitPrice").cast("float"))
    .withColumn("CustomerID", col("CustomerID").cast("int"))
)
# Remove the existing Delta table directory before writing
dbutils.fs.rm(silver_path, recurse=True)

silver_df.write.format("delta").mode("overwrite").save(silver_path)

# silver_df.write.format("delta").mode("overwrite").save(silver_path)
print("Silver (cleaned) data saved to ADLS")

In [0]:
df = spark.read.format("delta").load(silver_path)

# Coalesce to avoid too many small files and empty files
df.coalesce(1) \
  .write.mode("overwrite") \
  .parquet("abfss://silver@rgdbronzestretail0001.dfs.core.windows.net/silver_parquet_clean")
