 # Sales order data exploration

 Use the code in this notebook to explore sales order data.

In [50]:
from pyspark.sql.types import *


orderSchema = StructType([
    StructField("SalesOrderID", StringType()),
    StructField("SalesOrderLineNumber", IntegerType()),
    StructField("SalesOrderDate", StringType()),
    StructField("CustomerName", StringType()),
    StructField( "CustomerEmailAddress", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("TaxAmount", FloatType())
])

# df = spark.read.format("csv").schema(orderSchema).load("Files/orders/2019.csv")
# Use a wildcard to load all files at once 
df = spark.read.format("csv").schema(orderSchema).load("Files/orders/*.csv")
# df now is a Spark DataFrame containing CSV data from "Files/orders/2019.csv, 2020.csv, and 2021.csv".
display(df)

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, b8be21ed-ea85-465c-852f-2d42ce4b3c9b)

In [51]:
# Filtering the Data 
Customers = df["SalesOrderDate", "UnitPrice"]
display(Customers)

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 94fecb85-b9d5-4ef4-854f-fe47e6be7ec5)

In [52]:
print(Customers.count())

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 5, Finished, Available)

32718


In [53]:
Customers.columns

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 6, Finished, Available)

['SalesOrderDate', 'UnitPrice']

In [54]:
Customers.distinct().count()

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 7, Finished, Available)

8298

In [55]:
display(Customers.distinct())

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 8, Finished, Available)

SynapseWidget(Synapse.DataFrame, 6fb0e77c-052b-4630-8c3e-d68734cbceb3)

In [56]:
display(df.select("CustomerName", "CustomerEmailAddress").where(df["Item"]=="Road-250 Red, 52"))

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 9, Finished, Available)

SynapseWidget(Synapse.DataFrame, 69ddf5b5-8415-49dc-b8f2-80b9f08f7f4c)

In [57]:
from pyspark.sql.functions import *
filters = df.select("CustomerName", "CustomerEmailAddress").where((col("Item") == "Road-250 Red, 52") & (col("CustomerEmailAddress").like("%@adventure-works.com%")))
display(filters)


StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 10, Finished, Available)

SynapseWidget(Synapse.DataFrame, ff4d3268-c1f8-4740-9911-25202ad31b11)

In [58]:
display(df.select("Item", "Quantity").groupBy("Item").sum().orderBy("sum(Quantity)"))

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 11, Finished, Available)

SynapseWidget(Synapse.DataFrame, 750ec429-b67d-43dc-8bfa-938a21f87076)

In [59]:
print(df.columns)

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 12, Finished, Available)

['SalesOrderID', 'SalesOrderLineNumber', 'SalesOrderDate', 'CustomerName', 'CustomerEmailAddress', 'Item', 'Quantity', 'UnitPrice', 'TaxAmount']


In [60]:
display(df.select(year("SalesOrderDate")).distinct())

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, ed998993-22e2-4f3c-bef4-018d5f5489b7)

In [61]:
display(df.select(year("SalesOrderDate").alias("Year"),"UnitPrice"))

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 14, Finished, Available)

SynapseWidget(Synapse.DataFrame, 85b313aa-faf2-4279-9d59-6b078d46008d)

In [62]:
yearlySales = df.select(year("SalesOrderDate").alias("Year")).groupBy("Year").count().orderBy("Year")
display(yearlySales)

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 973c98f4-3d0b-4314-9dcd-829ad05c2349)

In [63]:
## Create Year, Month, and Day of Month columns
transformed_df = df.withColumn("Year", year(col("SalesOrderDate"))).withColumn("Month", month(col("SalesOrderDate"))).withColumn("Day", dayofmonth(col("SalesOrderDate")))

# Create the new FirstName and LastName fields
transformed_df = transformed_df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Filter and reorder columns
transformed_df = transformed_df["SalesOrderID", "SalesOrderLineNumber", "SalesOrderDate", "Year", "Month", "Day", "FirstName", "LastName", "CustomerEmailAddress", "Item", "Quantity", "UnitPrice", "TaxAmount"]

# Display the first five orders
display(transformed_df.tail(5))

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 16, Finished, Available)

SynapseWidget(Synapse.DataFrame, 19a13d6a-82a1-4ae2-b1f6-abec7b08d876)

In [64]:
transformed_df.write.mode("overwrite").parquet('Files/transformed_data/orders')
print ("Transformed data saved!")

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 17, Finished, Available)

Transformed data saved!


In [65]:
orders_df = spark.read.format("parquet").load("Files/transformed_data/orders")
display(orders_df)

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 18, Finished, Available)

SynapseWidget(Synapse.DataFrame, 26bf3f92-a729-4e06-8d4e-5552affc7622)

In [66]:
orders_df.write.partitionBy("Year","Month", "Day").mode("overwrite").parquet("Files/partitioned_data")
print ("Transformed data saved!")

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 19, Finished, Available)

Transformed data saved!


In [67]:
orders_df = spark.read.format("parquet").load("Files/partitioned_data/Year=2019/Month=10")
# display(orders_df)

display(orders_df.select("SalesOrderDate", "UnitPrice"))

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 20, Finished, Available)

SynapseWidget(Synapse.DataFrame, 343f73b0-e298-408c-b8dc-61c4689a31be)

In [68]:
orders_df = spark.read.format("parquet").load("Files/partitioned_data/Year=2019/Month=11")
# display(orders_df)

display(orders_df.select("SalesOrderDate", "UnitPrice"))

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 21, Finished, Available)

SynapseWidget(Synapse.DataFrame, a051f656-a31f-4ca8-b2c7-93cd67cb4b53)

In [69]:
orders_2021_df = spark.read.format("parquet").load("Files/partitioned_data/Year=2021/Month=*/Day=*")
display(orders_2021_df)

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 22, Finished, Available)

SynapseWidget(Synapse.DataFrame, c2b393bc-e869-4b1c-bd86-a60ba1f084d8)

In [70]:
 # Create a new table
df.write.format("delta").saveAsTable("salesorders")

# Get the table description
spark.sql("DESCRIBE EXTENDED salesorders").show(truncate=False)

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 23, Finished, Available)

+----------------------------+-------------------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                            |comment|
+----------------------------+-------------------------------------------------------------------------------------------------------------------------------------+-------+
|SalesOrderID                |string                                                                                                                               |       |
|SalesOrderLineNumber        |int                                                                                                                                  |       |
|SalesOrderDate              |string                                                                                                   

In [71]:
df = spark.sql("SELECT * FROM spark_lakehouse_lab.salesorders LIMIT 1000")
display(df)

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 24, Finished, Available)

SynapseWidget(Synapse.DataFrame, f2fb585e-810e-44a9-8cc9-40ea00cf38bf)

In [72]:
%%sql
SELECT YEAR(SalesOrderDate) AS OrderYear, SUM((UnitPrice * Quantity + TaxAmount)) AS GrossRevenue
FROM salesorders
GROUP BY YEAR(SalesOrderDate)
ORDER BY OrderYear;

StatementMeta(, b5ad5919-e247-416b-85fb-8eba43f6041d, 25, Finished, Available)

<Spark SQL result set with 3 rows and 2 fields>