Data preprocessing for the global sales data in the udemy course below<br>
https://www.udemy.com/course/the-complete-power-bi-practical-course

In [23]:
from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder.appName("GlobalSalesDataProcessing").getOrCreate()

# Read the CSV file
df = spark.read.csv('../input_data/udemy_ms_powerbi/factInternetSales.csv', header=True, inferSchema=True)

# Show the first few rows
df.show()

+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-------+-------+--------+-------+----+--------+
|ProductKey| OrderDate|   DueDate| ShipDate|CustomerKey|PromotionKey|SalesTerritoryKey|SalesOrderNumber|SalesOrderLineNumber|RevisionNumber|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|     29|      6|      98|     19|  36|     100|
+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-------+-------+--------+-------+----+--------+
|       310|29/12/2010|10/01/2011|5/01/2011|      21768|           1|                6|         SO43

60398

In [13]:
# Rename 6 currency key columns prior to unpivot
ck_29 = 'ck_29'
ck_6 = 'ck_6'
ck_98 = 'ck_98'
ck_19 = 'ck_19'
ck_36 = 'ck_36'
ck_100 = 'ck_100'
df = df.withColumnRenamed('29', ck_29).withColumnRenamed('6', ck_6).withColumnRenamed('98', ck_98).withColumnRenamed('19', ck_19).withColumnRenamed('36', ck_36).withColumnRenamed('100', ck_100)
df.show()

+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-------+-------+--------+-------+-----+--------+
|ProductKey| OrderDate|   DueDate| ShipDate|CustomerKey|PromotionKey|SalesTerritoryKey|SalesOrderNumber|SalesOrderLineNumber|RevisionNumber|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|  ck_29|   ck_6|   ck_98|  ck_19|ck_36|  ck_100|
+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-------+-------+--------+-------+-----+--------+
|       310|29/12/2010|10/01/2011|5/01/2011|      21768|           1|                6|         S

In [15]:
# Unpivot to create the 2 columns: Currency Key & Sales Amount
from pyspark.sql import functions as F

df_unpivoted = (df.select(
    "ProductKey",
    "OrderDate",
    "DueDate",
    "ShipDate",
    "CustomerKey",
    "PromotionKey",
    "SalesTerritoryKey",
    "SalesOrderNumber",
    "SalesOrderLineNumber",
    "RevisionNumber",
    "OrderQuantity",
    "UnitPrice",
    "ExtendedAmount",
    "UnitPriceDiscountPct",
    "DiscountAmount",
    "ProductStandardCost",
    "TotalProductCost",
    F.explode(F.expr("array(struct(29 as Currency_Key, ck_29 as Sales_Amount), \
                         struct(6 as Currency_Key, ck_6 as Sales_Amount), \
                         struct(98 as Currency_Key, ck_98 as Sales_Amount), \
                         struct(19 as Currency_Key, ck_19 as Sales_Amount), \
                         struct(36 as Currency_Key, ck_36 as Sales_Amount), \
                         struct(100 as Currency_Key, ck_100 as Sales_Amount))")).alias("Currency_Amount")
).select(
    "ProductKey",
    "OrderDate",
    "DueDate",
    "ShipDate",
    "CustomerKey",
    "PromotionKey",
    "SalesTerritoryKey",
    "SalesOrderNumber",
    "SalesOrderLineNumber",
    "RevisionNumber",
    "OrderQuantity",
    "UnitPrice",
    "ExtendedAmount",
    "UnitPriceDiscountPct",
    "DiscountAmount",
    "ProductStandardCost",
    "TotalProductCost",
    "Currency_Amount.Currency_Key",
    "Currency_Amount.Sales_Amount")).withColumnRenamed("Currency_Key", "Currency Key").withColumnRenamed("Sales_Amount", "Sales Amount")

df_unpivoted.show()

+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+------------+------------+
|ProductKey| OrderDate|   DueDate| ShipDate|CustomerKey|PromotionKey|SalesTerritoryKey|SalesOrderNumber|SalesOrderLineNumber|RevisionNumber|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|Currency Key|Sales Amount|
+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+------------+------------+
|       310|29/12/2010|10/01/2011|5/01/2011|      21768|           1|                6|         SO43697|                   1|             1|            1|  3578.27

In [26]:
# Just get the records having valid sales amount
df_fact_internet_sales = df_unpivoted.filter(F.col("Sales Amount") != F.lit('NULL'))
df_fact_internet_sales.count()

60339

In [28]:
# Load dim currency table into a dataframe
df_dim_currency = spark.read.csv('../input_data/udemy_ms_powerbi/dimCurrency.csv', header=True, inferSchema=True)
df_dim_currency.show()

+-----------+--------------------+--------------------+-------------+
|CurrencyKey|CurrencyAlternateKey|        CurrencyName|Exchange Rate|
+-----------+--------------------+--------------------+-------------+
|          6|                 AUD|   Australian Dollar|         1.62|
|         19|                 CAD|     Canadian Dollar|          1.5|
|         29|                 DEM|       Deutsche Mark|          1.0|
|         36|                 EUR|                EURO|          1.0|
|         98|                 GBP|United Kingdom Pound|         0.83|
|        100|                 USD|           US Dollar|         1.08|
+-----------+--------------------+--------------------+-------------+



In [30]:
# Merge fact internet sales & dim currency
df_result = (df_fact_internet_sales.alias("f").join(df_dim_currency.alias("d"),
                                                    F.col("f.Currency Key") == F.col("d.CurrencyKey"), how="left")
             .drop("Currency Key", "CurrencyKey", "CurrencyName")).withColumnRenamed("CurrencyAlternateKey",
                                                                                     "Currency Code")
df_result.show()

+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+------------+-------------+-------------+
|ProductKey| OrderDate|   DueDate| ShipDate|CustomerKey|PromotionKey|SalesTerritoryKey|SalesOrderNumber|SalesOrderLineNumber|RevisionNumber|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|Sales Amount|Currency Code|Exchange Rate|
+----------+----------+----------+---------+-----------+------------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+------------+-------------+-------------+
|       310|29/12/2010|10/01/2011|5/01/2011|      21768|           1|                6|         SO43697|              