In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, round

In [None]:
spark = SparkSession. \
        builder. \
        appName("Spark Example").\
        getOrCreate()

In [None]:
spark

In [None]:
blob_account_name = "BLOB_ACCOUNT_NAME"
blob_container_name = "BLOB_CONTAINER_NAME"
data_folder = "raw"
blob_key = "BLOB_KEY"

wasbs_path = f"wasbs://sparkcontainer@{blob_account_name}.blob.core.windows.net/"
spark.conf.set(f"fs.azure.account.key.{blob_account_name}.blob.core.windows.net", blob_key)


### Read Data

In [None]:
product_path = wasbs_path + "raw_data/dimproduct.csv"
product = spark.read.load(product_path,
                            format="csv", 
                            sep=",", 
                            inferSchema="true", 
                            header="true")

print(f"Number of rows: {product.count()}")
product.printSchema()

Number of rows: 2517
root
 |-- ProductKey: integer (nullable = true)
 |-- ProductLabel: integer (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- ProductDescription: string (nullable = true)
 |-- ProductSubcategoryKey: string (nullable = true)
 |-- Manufacturer: string (nullable = true)
 |-- BrandName: string (nullable = true)
 |-- ClassID: string (nullable = true)
 |-- ClassName: string (nullable = true)
 |-- StyleID: string (nullable = true)
 |-- StyleName: string (nullable = true)
 |-- ColorID: string (nullable = true)
 |-- ColorName: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- SizeRange: string (nullable = true)
 |-- SizeUnitMeasureID: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- WeightUnitMeasureID: string (nullable = true)
 |-- UnitOfMeasureID: string (nullable = true)
 |-- UnitOfMeasureName: string (nullable = true)
 |-- StockTypeID: string (nullable = true)
 |-- StockTypeName: string (nullable = true)
 |-- UnitCost: s

In [None]:
stores_path = wasbs_path + "raw_data/dimstores.csv"
store = spark.read.load(stores_path,
                            format="csv", 
                            sep=",", 
                            inferSchema="true", 
                            header="true")

print(f"Number of rows: {store.count()}")
store.printSchema()

Number of rows: 307
root
 |-- StoreKey: integer (nullable = true)
 |-- StoreType: string (nullable = true)
 |-- StoreName: string (nullable = true)
 |-- StoreStatus: string (nullable = true)
 |-- CreatedDate: timestamp (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)



In [None]:
sales_path = wasbs_path + "raw_data/factsales.csv"
sales = spark.read.load(sales_path,
                            format="csv", 
                            sep=",", 
                            inferSchema="true", 
                            header="true")

print(f"Number of rows: {sales.count()}")
sales.printSchema()

Number of rows: 3406089
root
 |-- SalesKey: integer (nullable = true)
 |-- DateKey: timestamp (nullable = true)
 |-- channelKey: integer (nullable = true)
 |-- StoreKey: integer (nullable = true)
 |-- ProductKey: integer (nullable = true)
 |-- PromotionKey: integer (nullable = true)
 |-- CurrencyKey: integer (nullable = true)
 |-- UnitCost: double (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- SalesQuantity: integer (nullable = true)
 |-- ReturnQuantity: integer (nullable = true)
 |-- ReturnAmount: double (nullable = true)
 |-- DiscountQuantity: integer (nullable = true)
 |-- DiscountAmount: double (nullable = true)
 |-- TotalCost: double (nullable = true)
 |-- SalesAmount: double (nullable = true)
 |-- ETLLoadID: integer (nullable = true)
 |-- LoadDate: timestamp (nullable = true)
 |-- UpdateDate: timestamp (nullable = true)



In [None]:
print(f"Number of rows: {store.count()}")

Number of rows: 307


### Clean Data

In [None]:
# Show all data with selected columns
store.select("StoreKey","StoreName", "StoreStatus").show()

+--------+--------------------+-----------+
|StoreKey|           StoreName|StoreStatus|
+--------+--------------------+-----------+
|       1|Contoso Seattle N...|         On|
|       2|Contoso Seattle N...|         On|
|       3|Contoso Kennewick...|         On|
|       4|Contoso Bellevue ...|         On|
|       5|Contoso Redmond S...|         On|
|       6|Contoso Yakima Store|         On|
|       7|Contoso Granger S...|         On|
|       8|Contoso Sunnyside...|         On|
|       9|Contoso Toppenish...|         On|
|      10|Contoso Wapato Store|         On|
|      11|Contoso Cle Elum ...|         On|
|      12|Contoso North Ben...|        Off|
|      13|Contoso Snoqualmi...|         On|
|      14|Contoso Fall City...|         On|
|      15|Contoso Renton Store|         On|
|      16|Contoso Everett S...|         On|
|      17|Contoso Spokane S...|         On|
|      18|Contoso Veradale ...|         On|
|      19|Contoso Cheney Store|        Off|
|      20|Contoso Englewood...| 

In [None]:
# Rename column and show all columns
store = store.withColumnRenamed("StoreStatus","Status")
store.columns

Out[56]: ['StoreKey', 'StoreType', 'StoreName', 'Status', 'CreatedDate', 'ModifiedDate']

In [None]:
# Group By and count
store.groupBy("Status").count().show()

+------+-----+
|Status|count|
+------+-----+
|    On|  295|
|   Off|   12|
+------+-----+



In [None]:
# Fitler and sort
store.filter(store["Status"] == "Off"). \
      sort(store.StoreKey.desc()). \
      show()

+--------+---------+--------------------+------+--------------------+--------------------+
|StoreKey|StoreType|           StoreName|Status|         CreatedDate|        ModifiedDate|
+--------+---------+--------------------+------+--------------------+--------------------+
|     184|    Store|Contoso Alexandri...|   Off|2022-11-02 17:18:...|2022-11-02 17:18:...|
|     162|    Store|Contoso Pittsfiel...|   Off|2022-11-02 17:18:...|2022-11-02 17:18:...|
|     148|    Store|Contoso Worcester...|   Off|2022-11-02 17:18:...|2022-11-02 17:18:...|
|     129|    Store|Contoso Trenton N...|   Off|2022-11-02 17:18:...|2022-11-02 17:18:...|
|     119|    Store|Contoso Buffalo S...|   Off|2022-11-02 17:18:...|2022-11-02 17:18:...|
|     112|    Store|Contoso Key West ...|   Off|2022-11-02 17:18:...|2022-11-02 17:18:...|
|      84|    Store|Contoso Humble Store|   Off|2022-11-02 17:18:...|2022-11-02 17:18:...|
|      62|    Store|Contoso Stoughton...|   Off|2022-11-02 17:18:...|2022-11-02 17:18:...|

In [None]:
# Remove columns
sales = sales.drop(
            'PromotionKey',
            'CurrencyKey',
            'ReturnQuantity',
            'ReturnAmount',
            'DiscountQuantity',
            'DiscountAmount',
            'TotalCost',
            'SalesAmount',
            'ETLLoadID',
            'LoadDate',
            'UpdateDate'
            )

In [None]:
# Add new columns
sales = sales.withColumn("TotalSales", round(col("UnitPrice") * col("SalesQuantity"),2))
sales = sales.withColumn("TotalCost", round(col("UnitCost") * col("SalesQuantity"),2))
sales = sales.withColumn("Year", year(sales.DateKey))
sales = sales.withColumn("Month", month(sales.DateKey))
sales.show()

+--------+-------------------+----------+--------+----------+--------+---------+-------------+----------+---------+----+-----+
|SalesKey|            DateKey|channelKey|StoreKey|ProductKey|UnitCost|UnitPrice|SalesQuantity|TotalSales|TotalCost|Year|Month|
+--------+-------------------+----------+--------+----------+--------+---------+-------------+----------+---------+----+-----+
|       1|2007-01-02 00:00:00|         1|     209|       956|   91.05|    198.0|            8|    1584.0|    728.4|2007|    1|
|       2|2007-02-12 00:00:00|         4|     308|       766|   10.15|     19.9|            4|      79.6|     40.6|2007|    2|
|       3|2008-01-24 00:00:00|         1|     156|      1175|  209.03|    410.0|            9|    3690.0|  1881.27|2008|    1|
|       4|2008-01-13 00:00:00|         2|     306|      1429|   132.9|    289.0|            8|    2312.0|   1063.2|2008|    1|
|       5|2008-01-22 00:00:00|         2|     306|      1133|  144.52|    436.2|           24|   10468.8|  3468

### Join Data

In [None]:
merged_df = sales.join(store.select("StoreKey","StoreName", "StoreType"), on="StoreKey", how="left"). \
                  join(product.select("ProductKey","ProductName"), on="ProductKey", how="left")

merged_df.show()

+----------+--------+--------+-------------------+----------+--------+---------+-------------+----------+---------+----+-----+--------------------+---------+--------------------+
|ProductKey|StoreKey|SalesKey|            DateKey|channelKey|UnitCost|UnitPrice|SalesQuantity|TotalSales|TotalCost|Year|Month|           StoreName|StoreType|         ProductName|
+----------+--------+--------+-------------------+----------+--------+---------+-------------+----------+---------+----+-----+--------------------+---------+--------------------+
|       956|     209|       1|2007-01-02 00:00:00|         1|   91.05|    198.0|            8|    1584.0|    728.4|2007|    1|Contoso Baildon S...|    Store|A. Datum Point Sh...|
|       766|     308|       2|2007-02-12 00:00:00|         4|   10.15|     19.9|            4|      79.6|     40.6|2007|    2|Contoso North Ame...| Reseller|Contoso Battery c...|
|      1175|     156|       3|2008-01-24 00:00:00|         1|  209.03|    410.0|            9|    3690.0|

In [None]:
merged_df_path = wasbs_path + "cleaned_data/"
merged_df.write.partitionBy("Year", "Month").parquet(merged_df_path)

In [None]:
# New folder created in Blob Storage with parquet data organized by Year and Month
%fs ls wasbs://sparkcontainer@sparktest21.blob.core.windows.net/cleaned_data/

path,name,size,modificationTime
wasbs://sparkcontainer@sparktest21.blob.core.windows.net/cleaned_data/Year=2007/,Year=2007/,0,1678562606000
wasbs://sparkcontainer@sparktest21.blob.core.windows.net/cleaned_data/Year=2008/,Year=2008/,0,1678562655000
wasbs://sparkcontainer@sparktest21.blob.core.windows.net/cleaned_data/Year=2009/,Year=2009/,0,1678562700000
wasbs://sparkcontainer@sparktest21.blob.core.windows.net/cleaned_data/_SUCCESS,_SUCCESS,0,1678562747000


### SQL Query

In [None]:
merged_df.createOrReplaceTempView("salesSQL")
spark.sql("SELECT count(*) AS no_of_rows FROM salesSQL").show()

+----------+
|no_of_rows|
+----------+
|   3406089|
+----------+



In [None]:
store_revenue = spark.sql("""SELECT StoreType, 
                            ROUND(SUM(TotalSales),2) AS SUMTotalSales, 
                            ROUND(SUM(TotalCost), 2) AS SUMTotalCost,
                            ROUND(SUM(TotalSales) - SUM(TotalCost), 2) AS Profit,
                            ROUND((SUM(TotalSales) - SUM(TotalCost))/SUM(TotalSales) * 100, 2) || '%' AS Margin 
                            FROM salesSQL 
                            GROUP BY StoreType""")
display(store_revenue)

StoreType,SUMTotalSales,SUMTotalCost,Profit,Margin
Store,7055960442.11,3021055446.07,4034904996.04,57.18%
Reseller,1745555623.95,751114458.98,994441164.97,56.97%
Online,2724649800.69,1174784746.76,1549865053.93,56.88%
Catalog,1091648846.28,469306387.74,622342458.54,57.01%
