<mark># **Explore & process Sales data using Apache Spark in Microsoft Fabric**


**Read the data to a Spark dataframe without headers**

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
df_br = spark.read.csv("Files/Raw/")
#display(df_br)

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 4, Finished, Available, Finished)

**Create columns-headers & data-types for all columns using Spark programmatic way**

In [3]:
my_schema = StructType([StructField("sales_order_no", StringType(), False),        
                    StructField("sales_order_line_no", IntegerType()),
                    StructField("order_date", DateType()),
                    StructField("customer_name", StringType()),
                    StructField("email", StringType()),
                    StructField("item", StringType()),
                    StructField("quantity", IntegerType()),
                    StructField("unit_price", DecimalType(10,2)),
                    StructField("tax",DecimalType(10,2))
                    ])
df_2019 = spark.read.format("csv")\
                    .schema(my_schema)\
                    .option("header", "true")\
                    .load("Files/Raw/2019.csv")
#display(df_2019)
df_2020 = spark.read.format("csv")\
                    .schema(my_schema)\
                    .option("header", "true")\
                    .load("Files/Raw/2020.csv")
#display(df_2020)
df_2021 = spark.read.format("csv")\
                    .schema(my_schema)\
                    .option("header", "true")\
                    .load("Files/Raw/2021.csv")
#display(df_2021)

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 5, Finished, Available, Finished)

**Read/combine data from all the 3 csv files to the dataframe**

In [4]:
df = df_2019.unionByName(df_2020).unionByName(df_2021)
#display(df)

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 6, Finished, Available, Finished)

**Split CustomerName col in first_name & last_name**

In [5]:
df = df.withColumn("first_name", split(col("customer_name"), " ").getItem(0))\
        .withColumn("last_name", split(col("customer_name"), " ").getItem(1))
#display(df.head(10))

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 7, Finished, Available, Finished)

**a.Filter CustomerName and Email from the dataframe
b. display the count of all the customers with repeated orders
c. display the count of all unique customers unique orders**

In [6]:
df_filter1 = df.select("customer_name","email")
#display(df_filter1)
df_repeat_cust = df.groupBy ("customer_name")\
                    .count()\
                    .filter("count>1")
#display(df_repeat_cust)
df_unique_cust = df.groupBy("customer_name")\
                    .count()\
                    .filter(col("count") == 1)
#(df_unique_cust)

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 8, Finished, Available, Finished)


**Display total number of quantity with respect to every Item**


In [7]:
df_item_qty = df.groupBy("item")\
                .agg(sum("quantity").alias ("total_qty"))
        
#display(df_item_qty)

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 9, Finished, Available, Finished)

**Display Yearly sales for all the years** 

In [8]:
df_yearly_sales = df.withColumn("year", year(col("order_date")))\
                    .groupBy("year")\
                    .agg(sum(col("quantity")* col("unit_price")).alias ("total_yearly_sales"))

#display(df_yearly_sales)

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 10, Finished, Available, Finished)

**Display Monthly sales for all the years** 

In [9]:
df_monthly_sales = df.groupBy(year(col("order_date")).alias ("year"), month(col("order_date")).alias("month"))\
                    .agg(sum(col("quantity")*col("unit_price")).alias("total_monthly_sales"))\
                    .orderBy("year","month", asc = True)
                
#display(df_monthly_sales )

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 11, Finished, Available, Finished)

**Top 10 Items by sales **

In [10]:
#df.select("item").distinct().count()
df_top_sales = df.groupBy("item")\
            .agg(sum(col("unit_price")* col("quantity")).alias("total_sales"))\
            .withColumn("total_sales", round("total_sales", 2))\
            .orderBy(col("total_sales").desc())

#display(df_top_sales.head(10))

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 12, Finished, Available, Finished)

**Tax contribution analysis**

In [11]:
df_tax_by_item = df.groupBy("item")\
                .agg(sum((col("tax")/col("unit_price"))*100).alias("taxcontri_by_item"))\
                .orderBy(col("taxcontri_by_item").desc())
#display((df_tax_by_item).head(10))

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 13, Finished, Available, Finished)

**Top 10 Customers by Total Purchase**

In [12]:
df_top_cust = df.groupBy("first_name","last_name")\
            .agg(sum(col("unit_price")* col("quantity")).alias("total_sales"))\
            .orderBy(col("total_sales").desc())

#display((df_top_cust).head(50))

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 14, Finished, Available, Finished)

**Column Pruning**

In [13]:
df = df.withColumn("year", year(col("order_date")))\
   .withColumn("month", month(col("order_date")))     
df = df.select("sales_order_no","sales_order_line_no","first_name", "last_name","order_date", "year",\
            "month","email", "item","quantity","unit_price","tax")
#display(df.head(10))

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 15, Finished, Available, Finished)

**Partition the df by 'year' and 'month'**

In [None]:
df.write.format("parquet")\
    .partitionBy("year","month")\
    .mode("overwrite")\
    .save("Files/Silver/")

**Read data from parquet files for year 2021**

In [15]:
df_2021 = spark.read.format("parquet")\
            .load("Files/Silver/year=2021/")
#display(df_2021)

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 17, Finished, Available, Finished)

**Write each partition data to a delta format and name the tables as: salesorders2019, salesorders2020, salesorders2021.**

In [16]:
year = [2019, 2020, 2021]
for y in year:
    df.filter(col("year") == y)\
        .write.format("delta")\
        .option("mergeSchema", "true")\
        .mode("overwrite")\
        .saveAsTable(f"SalesOrder{y}")


StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 18, Finished, Available, Finished)

In [17]:
df_2020 = spark.read.format("delta")\
            .table("SalesOrder2020")
#display(df_2020)

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 19, Finished, Available, Finished)

In [18]:
%%sql
select * from SalesOrder2020
--DESCRIBE TABLE SalesOrder2020

StatementMeta(, a9fc42fb-9b10-446a-91ef-f161366c64e3, 20, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 12 fields>