In [1]:
display(dbutils.fs.ls("/mnt/s3data"))

path,name,size
dbfs:/mnt/s3data/Superstore.csv,Superstore.csv,2287742
dbfs:/mnt/s3data/returns.csv,returns.csv,16019


In [2]:
# File location and type
file_location = "dbfs:/mnt/s3data/Superstore.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_superstore = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

#display(df_superstore.printSchema())

In [3]:
file_location = "dbfs:/mnt/s3data/returns.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_returns = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location) \
  .dropDuplicates()


In [4]:
# join df_superstore and df_returns
df_superstore_merge=df_superstore.join(df_returns,"Order ID",how="left")
#display(df_superstore_merge)

In [5]:
#Clean Column header remove spaces and special character
from pyspark.sql.types import DateType,IntegerType
from pyspark.sql.functions import *
exprs = [col(column).alias(column.replace(' ', '_')) for column in df_superstore_merge.columns]
print(exprs)
df_superstore_cln=df_superstore_merge.select(*exprs)
df_superstore_cln=df_superstore_cln.withColumnRenamed("Sub-Category", "Sub_Category")\
       .withColumnRenamed("Country/Region", "Country_Region") \
       .filter(col("Country_Region")=="United States")
#display(df_superstore_cln)

In [6]:
#Convert Date column and calculate duration
df_superstore_cln=df_superstore_cln.withColumn("OrderDateClean",to_date(col("Order_Date"), "MM/dd/yyyy")).withColumn("ShipDateClean",to_date(col("Ship_Date"), "MM/dd/yyyy")).withColumn("duration",datediff(col("ShipDateClean"),col("OrderDateClean")))
#display(df_superstore_cln)

In [7]:
# Aggegate and calcuate Sales, Qty, Avg Duration and Uniqe Customer by state, Category, sub cat and ship mode grain
df_superstore_sales=df_superstore_cln.groupby("State","Category","Sub_Category","Ship_Mode").agg({'Sales':'sum','Quantity':'sum','duration':'avg','Order_ID':'count'})
df_uniq_customer=df_superstore_cln.groupby("State","Category","Sub_Category","Ship_Mode").agg(countDistinct("Customer_ID"))
#df_superstore_agg=df_superstore_sales.join(df_uniq_customer,(df_superstore_sales.State == df_uniq_customer.State) & (df_superstore_sales.Category == df_uniq_customer.Category) & (df_superstore_sales.Sub_Category == df_uniq_customer.Sub_Category) & (df_superstore_sales.Ship_Mode == df_uniq_customer.Ship_Mode))
df_superstore_agg=df_superstore_sales.join(df_uniq_customer,["State","Category","Sub_Category","Ship_Mode"])
#display(df_superstore_agg)

In [8]:
# Prepare Final Dataframe(rename column name)
df_superstore_final=df_superstore_agg.select("State","Category","Sub_Category","Ship_Mode",round(col("avg(duration)"),2).alias("Avg_Duration"),col("count(Order_ID)").alias("UniqUserCountLong"),round(col("sum(Sales)"),2).alias("SalesAmt"),round(col("sum(Quantity)"),2).alias("Quantity"))

df_superstore_final=df_superstore_final.withColumn("UniqUserCount", df_superstore_final["UniqUserCountLong"].cast(IntegerType())).drop("UniqUserCountLong")
df_superstore_final.printSchema()
#display(df_superstore_final)


In [9]:
# Load into a Table
spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")

superstore_table_name = "superstore_curated"

df_superstore_final.write.format("parquet").mode('overwrite').saveAsTable(superstore_table_name)