In [1]:
# Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Pyspark Project") \
    .master("local[*]") \
    .getOrCreate()

spark

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

### LOAD ALL THE DATA

In [11]:
# Read the products csv file into dataframe

products = spark.read.format("csv").option("header", True).option("inferSchema", True).load("data/tables/products.csv")

In [12]:
# display products dataframe
products.show(truncate=False)

+---------+-------------------------------+----------+----------+--------------------+---------+------------+------------+------------+------------+
|ProductID|ProductName                    |SupplierID|CategoryID|QuantityPerUnit     |UnitPrice|UnitsInStock|UnitsOnOrder|ReorderLevel|Discontinued|
+---------+-------------------------------+----------+----------+--------------------+---------+------------+------------+------------+------------+
|1        |Chai                           |1         |1         |10 boxes x 20 bags  |18.0     |39          |0           |10          |false       |
|2        |Chang                          |1         |1         |24 - 12 oz bottles  |19.0     |17          |40          |25          |false       |
|3        |Aniseed Syrup                  |1         |2         |12 - 550 ml bottles |10.0     |13          |70          |25          |false       |
|4        |Chef Anton's Cajun Seasoning   |2         |2         |48 - 6 oz jars      |22.0     |53        

In [13]:
# Read the orders csv file into dataframe

orders = spark.read.format("csv").option("header", True).option("inferSchema", True).load("data/tables/orders.csv")

In [21]:
# display orders dataframe
orders.show()

+-------+----------+----------+------------+-----------+-------+-------+--------------------+----------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+-------+
|OrderID|CustomerID|EmployeeID|RequiredDate|ShippedDate|ShipVia|Freight|            ShipName|     ShipAddress|      ShipCity|ShipRegion|ShipPostalCode|ShipCountry| OrderDate|ProductID|UnitPrice|Quantity|Discount| _Sales|
+-------+----------+----------+------------+-----------+-------+-------+--------------------+----------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+-------+
|  11070|     LEHMS|         2|  17-10-2024|       null|      1|  136.0| Lehmanns Marktstand|    Magazinweg 7|Frankfurt a.M.|      null|         60528|    Germany|19-09-2024|       31|     12.5|      20|     0.0|  250.0|
|  11067|     DRACD|         1|  02-10-2024| 20-09-2024|      2|   7.98|Drachenblut Delik...|    Walserweg 21|      

In [15]:
# Read the customers csv file into dataframe

customers = spark.read.format("csv").option("header", True).option("inferSchema", True).load("data/tables/customers.csv")

In [20]:
customers.show()

+----------+--------------------+------------------+--------------------+--------------------+------------+------+----------+-----------+--------------+--------------+
|CustomerID|         CompanyName|       ContactName|        ContactTitle|             Address|        City|Region|PostalCode|    Country|         Phone|           Fax|
+----------+--------------------+------------------+--------------------+--------------------+------------+------+----------+-----------+--------------+--------------+
|     ALFKI| Alfreds Futterkiste|      Maria Anders|Sales Representative|       Obere Str. 57|      Berlin|  null|     12209|    Germany|   030-0074321|   030-0076545|
|     ANATR|Ana Trujillo Empa...|      Ana Trujillo|               Owner|Avda. de la Const...| México D.F.|  null|     05021|     Mexico|  (5) 555-4729|  (5) 555-3745|
|     ANTON|Antonio Moreno Ta...|    Antonio Moreno|               Owner|     Mataderos  2312| México D.F.|  null|     05023|     Mexico|  (5) 555-3932|        

In [17]:
# Read the employees csv file into dataframe

employees = spark.read.format("csv").option("header", True).option("inferSchema", True).load("data/tables/employees.csv")

In [19]:
employees.show()

+---------------+---------+---------+--------------------+---------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+---------+--------------------+---------+--------------------+----------------+------+
|     EmployeeID| LastName|FirstName|               Title|TitleOfCourtesy|     BirthDate|  HireDate|             Address|                City|              Region|          PostalCode|             Country|     HomePhone|Extension|               Notes|ReportsTo|           PhotoPath|       Full Name|Gender|
+---------------+---------+---------+--------------------+---------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+---------+--------------------+---------+--------------------+----------------+------+
|              1|  Davolio|    Nancy|Sales Representative|            Ms.|    0

In [22]:
# Read the suppliers csv file into dataframe

suppliers = spark.read.format("csv").option("header", True).option("inferSchema", True).load("data/tables/suppliers.csv")

In [24]:
suppliers.show()

+----------+--------------------+--------------------+--------------------+--------------------+-----------+--------+----------+---------+---------------+---------------+--------------------+
|SupplierID|         CompanyName|         ContactName|        ContactTitle|             Address|       City|  Region|PostalCode|  Country|          Phone|            Fax|            HomePage|
+----------+--------------------+--------------------+--------------------+--------------------+-----------+--------+----------+---------+---------------+---------------+--------------------+
|         1|      Exotic Liquids|    Charlotte Cooper|  Purchasing Manager|      49 Gilbert St.|     London|    null|   EC1 4SD|       UK| (171) 555-2222|           null|                null|
|         2|New Orleans Cajun...|       Shelley Burke| Order Administrator|      P.O. Box 78934|New Orleans|      LA|     70117|      USA| (100) 555-4822|           null|         #CAJUN.HTM#|
|         3|Grandma Kelly's H...|       

# *Questions*

### 1. How many current products cost less than $20?

In [30]:
from pyspark.sql.functions import col

products.where((col("UnitPrice") < 20) & (col("Discontinued") == "false")).show()

+---------+--------------------+----------+----------+--------------------+---------+------------+------------+------------+------------+
|ProductID|         ProductName|SupplierID|CategoryID|     QuantityPerUnit|UnitPrice|UnitsInStock|UnitsOnOrder|ReorderLevel|Discontinued|
+---------+--------------------+----------+----------+--------------------+---------+------------+------------+------------+------------+
|        1|                Chai|         1|         1|  10 boxes x 20 bags|     18.0|          39|           0|          10|       false|
|        2|               Chang|         1|         1|  24 - 12 oz bottles|     19.0|          17|          40|          25|       false|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles|     10.0|          13|          70|          25|       false|
|       13|               Konbu|         6|         8|            2 kg box|      6.0|          24|           0|           5|       false|
|       15|        Genen Shouyu|  

In [31]:
products.where("UnitPrice < 20 AND Discontinued = 'false'").count()

37

### 2. Which product is most expensive?

In [36]:
products.sort(desc("UnitPrice")).show(1)

+---------+-------------+----------+----------+------------------+---------+------------+------------+------------+------------+
|ProductID|  ProductName|SupplierID|CategoryID|   QuantityPerUnit|UnitPrice|UnitsInStock|UnitsOnOrder|ReorderLevel|Discontinued|
+---------+-------------+----------+----------+------------------+---------+------------+------------+------------+------------+
|       38|Côte de Blaye|        18|         1|12 - 75 cl bottles|    263.5|          17|           0|          15|       false|
+---------+-------------+----------+----------+------------------+---------+------------+------------+------------+------------+
only showing top 1 row



### 3. What is the average unit price for our products?

In [40]:
products.agg(round(avg("UnitPrice"), 2).alias("AvgUnitPrice")).show()

+------------+
|AvgUnitPrice|
+------------+
|       28.87|
+------------+



### 4. How many products are above the average unit price?

In [43]:
 # get the average unit price
avg_unit_price = products.agg(round(avg("UnitPrice"), 2).alias("AvgUnitPrice")).collect()[0]["AvgUnitPrice"]

# products above the average unit price
products.where(f"UnitPrice > {avg_unit_price}").show()

+---------+--------------------+----------+----------+--------------------+---------+------------+------------+------------+------------+
|ProductID|         ProductName|SupplierID|CategoryID|     QuantityPerUnit|UnitPrice|UnitsInStock|UnitsOnOrder|ReorderLevel|Discontinued|
+---------+--------------------+----------+----------+--------------------+---------+------------+------------+------------+------------+
|        7|Uncle Bob's Organ...|         3|         7|     12 - 1 lb pkgs.|     30.0|          15|           0|          10|       false|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars|     40.0|           6|           0|           0|       false|
|        9|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.|     97.0|          29|           0|           0|        true|
|       10|               Ikura|         4|         8|    12 - 200 ml jars|     31.0|          31|           0|           0|       false|
|       12|Queso Manchego La...|  

In [44]:
products.where(f"UnitPrice > {avg_unit_price}").count()

25

### 5. How many products costs between $15 & $25 (inclusive)?

In [45]:
products.where("UnitPrice >= 15 AND UnitPrice <= 25").show()

+---------+--------------------+----------+----------+-------------------+---------+------------+------------+------------+------------+
|ProductID|         ProductName|SupplierID|CategoryID|    QuantityPerUnit|UnitPrice|UnitsInStock|UnitsOnOrder|ReorderLevel|Discontinued|
+---------+--------------------+----------+----------+-------------------+---------+------------+------------+------------+------------+
|        1|                Chai|         1|         1| 10 boxes x 20 bags|     18.0|          39|           0|          10|       false|
|        2|               Chang|         1|         1| 24 - 12 oz bottles|     19.0|          17|          40|          25|       false|
|        4|Chef Anton's Caju...|         2|         2|     48 - 6 oz jars|     22.0|          53|           0|           0|       false|
|        5|Chef Anton's Gumb...|         2|         2|           36 boxes|    21.35|           0|           0|           0|        true|
|        6|Grandma's Boysenb...|         

In [46]:
products.where("UnitPrice >= 15 AND UnitPrice <= 25").count()

25

### 6. What is the average number of products (not quantity) per order?

In [47]:
orders.show()

+-------+----------+----------+------------+-----------+-------+-------+--------------------+----------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+-------+
|OrderID|CustomerID|EmployeeID|RequiredDate|ShippedDate|ShipVia|Freight|            ShipName|     ShipAddress|      ShipCity|ShipRegion|ShipPostalCode|ShipCountry| OrderDate|ProductID|UnitPrice|Quantity|Discount| _Sales|
+-------+----------+----------+------------+-----------+-------+-------+--------------------+----------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+-------+
|  11070|     LEHMS|         2|  17-10-2024|       null|      1|  136.0| Lehmanns Marktstand|    Magazinweg 7|Frankfurt a.M.|      null|         60528|    Germany|19-09-2024|       31|     12.5|      20|     0.0|  250.0|
|  11067|     DRACD|         1|  02-10-2024| 20-09-2024|      2|   7.98|Drachenblut Delik...|    Walserweg 21|      

In [57]:
# how many products were bougth per order
# what is the average number of products per order

product_count = orders.groupBy("OrderID").agg(count("ProductID").alias("ProductCount"))
product_count.agg(round(avg("ProductCount"), 0).alias("AvgProductCount")).show()

+---------------+
|AvgProductCount|
+---------------+
|            3.0|
+---------------+



### 7. What is order value in $ of open orders? ( Not shipped yet)

In [59]:
orders.show(5)

+-------+----------+----------+------------+-----------+-------+-------+--------------------+--------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+------+
|OrderID|CustomerID|EmployeeID|RequiredDate|ShippedDate|ShipVia|Freight|            ShipName|   ShipAddress|      ShipCity|ShipRegion|ShipPostalCode|ShipCountry| OrderDate|ProductID|UnitPrice|Quantity|Discount|_Sales|
+-------+----------+----------+------------+-----------+-------+-------+--------------------+--------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+------+
|  11070|     LEHMS|         2|  17-10-2024|       null|      1|  136.0| Lehmanns Marktstand|  Magazinweg 7|Frankfurt a.M.|      null|         60528|    Germany|19-09-2024|       31|     12.5|      20|     0.0| 250.0|
|  11067|     DRACD|         1|  02-10-2024| 20-09-2024|      2|   7.98|Drachenblut Delik...|  Walserweg 21|        Aachen|     

In [65]:
# filter for orders not shipped yet
# calculate the order value of those orders

orders_not_shipped = orders.filter(col("ShippedDate").isNull())

orders_not_shipped.agg(round(sum(col("_Sales")), 0).alias("OrderValue")).show()

+----------+
|OrderValue|
+----------+
|   27444.0|
+----------+



### 8. How many orders are 'single item' (only one product ordered)?

In [66]:
product_count.show()

+-------+------------+
|OrderID|ProductCount|
+-------+------------+
|  10817|           4|
|  10623|           5|
|  10362|           3|
|  11033|           2|
|  10703|           3|
|  10462|           2|
|  10914|           1|
|  10862|           2|
|  11025|           2|
|  10798|           2|
|  10708|           2|
|  10745|           4|
|  10468|           2|
|  10768|           4|
|  10788|           2|
|  10632|           2|
|  10527|           2|
|  10840|           2|
|  10567|           3|
|  10395|           3|
+-------+------------+
only showing top 20 rows



In [67]:
product_count.where("ProductCount = 1").show()

+-------+------------+
|OrderID|ProductCount|
+-------+------------+
|  10914|           1|
|  10815|           1|
|  11014|           1|
|  10947|           1|
|  10898|           1|
|  11057|           1|
|  10371|           1|
|  10422|           1|
|  10994|           1|
|  10295|           1|
|  10732|           1|
|  10881|           1|
|  10950|           1|
|  10433|           1|
|  10683|           1|
|  10336|           1|
|  10589|           1|
|  10586|           1|
|  10376|           1|
|  11050|           1|
+-------+------------+
only showing top 20 rows



In [68]:
product_count.where("ProductCount = 1").count()

137

### 9. Average Sales per Transaction (orderID) for "Romero y Tomillo"

In [69]:
orders.show(5)

+-------+----------+----------+------------+-----------+-------+-------+--------------------+--------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+------+
|OrderID|CustomerID|EmployeeID|RequiredDate|ShippedDate|ShipVia|Freight|            ShipName|   ShipAddress|      ShipCity|ShipRegion|ShipPostalCode|ShipCountry| OrderDate|ProductID|UnitPrice|Quantity|Discount|_Sales|
+-------+----------+----------+------------+-----------+-------+-------+--------------------+--------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+------+
|  11070|     LEHMS|         2|  17-10-2024|       null|      1|  136.0| Lehmanns Marktstand|  Magazinweg 7|Frankfurt a.M.|      null|         60528|    Germany|19-09-2024|       31|     12.5|      20|     0.0| 250.0|
|  11067|     DRACD|         1|  02-10-2024| 20-09-2024|      2|   7.98|Drachenblut Delik...|  Walserweg 21|        Aachen|     

In [86]:
# filter out the ship name 
romero = orders.filter(col("ShipName").like("Romero%"))

# groupby on orderid and find the sales amount per order (_sale)
order_value = romero.groupBy("OrderID").agg(round(sum("_Sales"),0).alias("OrderValue"))

# average order value
order_value.agg(avg("OrderValue").alias("AvgOrderValue")).show()

+-------------+
|AvgOrderValue|
+-------------+
|        293.6|
+-------------+



### 10. How many days since "North/South" last purchase?

In [92]:
customers.filter(col("CompanyName") == 'North/South').show()

+----------+-----------+--------------+---------------+--------------------+------+------+----------+-------+--------------+--------------+
|CustomerID|CompanyName|   ContactName|   ContactTitle|             Address|  City|Region|PostalCode|Country|         Phone|           Fax|
+----------+-----------+--------------+---------------+--------------------+------+------+----------+-------+--------------+--------------+
|     NORTS|North/South|Simon Crowther|Sales Associate|South House 300 Q...|London|  null|   SW7 1RZ|     UK|(171) 555-7733|(171) 555-2530|
+----------+-----------+--------------+---------------+--------------------+------+------+----------+-------+--------------+--------------+



In [96]:
# join the orders and customers by customer_id
# get the customer_id from the customers table and use this customer_id and filter out the orders

customer_id = customers.filter(col("CompanyName") == 'North/South').collect()[0]["CustomerID"]

In [97]:
orders.show(5)

+-------+----------+----------+------------+-----------+-------+-------+--------------------+--------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+------+
|OrderID|CustomerID|EmployeeID|RequiredDate|ShippedDate|ShipVia|Freight|            ShipName|   ShipAddress|      ShipCity|ShipRegion|ShipPostalCode|ShipCountry| OrderDate|ProductID|UnitPrice|Quantity|Discount|_Sales|
+-------+----------+----------+------------+-----------+-------+-------+--------------------+--------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+------+
|  11070|     LEHMS|         2|  17-10-2024|       null|      1|  136.0| Lehmanns Marktstand|  Magazinweg 7|Frankfurt a.M.|      null|         60528|    Germany|19-09-2024|       31|     12.5|      20|     0.0| 250.0|
|  11067|     DRACD|         1|  02-10-2024| 20-09-2024|      2|   7.98|Drachenblut Delik...|  Walserweg 21|        Aachen|     

In [141]:
# filter out the orders made by the norts customer_id
norts_order = orders.filter(col("CustomerID") == customer_id)

# sort the order date in desc order and get the latest order date
latest_order = norts_order.sort(desc(col("OrderDate"))).select("CustomerID", to_date(col("OrderDate"), "dd-MM-yyyy").alias("OrderDate"))

# add current date to the dataframe
current_date_added = latest_order.withColumn("CurrentDate", current_date())

# find the differnce between last purchase date & current date (datediff)
current_date_added = current_date_added.withColumn("DiffInDays", datediff(col("CurrentDate"), col("OrderDate")))

current_date_added.show(1)

+----------+----------+-----------+----------+
|CustomerID| OrderDate|CurrentDate|DiffInDays|
+----------+----------+-----------+----------+
|     NORTS|2024-09-13| 2025-07-02|       292|
+----------+----------+-----------+----------+
only showing top 1 row



### 11. How many customers have ordered only once?

In [155]:
orders.select("OrderID", "CustomerID", "ProductID")

# getting the data from product level to customers level
product_level = orders.groupBy("OrderID", "CustomerID").agg(count(col("ProductID")).alias("ProductCount"))

# how many times every customer has ordered 
order_count = product_level.groupBy("CustomerID").agg(count("OrderID").alias("OrderCount"))

# filter customers only where one order was placed
customer_ordered_once = order_count.where("OrderCount = 1")

customer_ordered_once.show()

+----------+----------+
|CustomerID|OrderCount|
+----------+----------+
|     CENTC|         1|
+----------+----------+



### 12. How many new customers in 2023?

In [211]:
# select the relevant columns and get the distinct orders
order_details = orders.select("OrderID", "CustomerID", to_date(col("OrderDate"), "dd-MM-yyyy").alias("OrderDate")).distinct()

# create the window frame and rank the orders by order date in ascending order
window = Window.partitionBy("CustomerID").orderBy("OrderDate")
order_ranks = order_details.withColumn("OrderRank", rank().over(window))

order_ranks.filter((col("OrderRank") == 1) & (year(col("OrderDate")) == 2023)).count()

57

### 13. How many Lost Customers in 2023?

In [177]:
orders.show(5)

+-------+----------+----------+------------+-----------+-------+-------+--------------------+--------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+------+
|OrderID|CustomerID|EmployeeID|RequiredDate|ShippedDate|ShipVia|Freight|            ShipName|   ShipAddress|      ShipCity|ShipRegion|ShipPostalCode|ShipCountry| OrderDate|ProductID|UnitPrice|Quantity|Discount|_Sales|
+-------+----------+----------+------------+-----------+-------+-------+--------------------+--------------+--------------+----------+--------------+-----------+----------+---------+---------+--------+--------+------+
|  11070|     LEHMS|         2|  17-10-2024|       null|      1|  136.0| Lehmanns Marktstand|  Magazinweg 7|Frankfurt a.M.|      null|         60528|    Germany|19-09-2024|       31|     12.5|      20|     0.0| 250.0|
|  11067|     DRACD|         1|  02-10-2024| 20-09-2024|      2|   7.98|Drachenblut Delik...|  Walserweg 21|        Aachen|     

In [210]:
# select the relevant columns and get the distinct orders
order_details = orders.select("OrderID", "CustomerID", to_date(col("OrderDate"), "dd-MM-yyyy").alias("OrderDate")).distinct()

# create window frame and rank orders in descending order by order date
window = Window.partitionBy("CustomerID").orderBy(desc(col("OrderDate")))
order_ranks = order_details.withColumn("OrderRank", rank().over(window))

# Filter orders where order rank is 1 and year is 2023
order_ranks.filter((col("OrderRank") == 1) & (year(col("OrderDate")) == 2023)).count()

1

### 14. How many customers have never purchased Queso Cabrales?

In [209]:
# join orders and orducts table using inner join
# filter rows where prodname is Queso Cabrales
# select only the distinct customerID

customers_purchased_queso = orders.join(products, orders.ProductID == products.ProductID, how='inner')\
    .filter(col("ProductName") == "Queso Cabrales")\
    .select("CustomerID").distinct()

#perform a left anti join to get customers who did not purchase queso cabrales
customers.join(customers_purchased_queso, on="CustomerID", how="left_anti").select("CustomerID").distinct().count()

59

### 15. How many customers have purchased only Queso Cabrales (per OrderID)?

In [230]:
# customers who ordered only one product
customers_bougth_one_product = orders.groupBy("OrderID", "CustomerID").agg(count("ProductID").alias("ProductCount"))\
    .filter((col("ProductCount")) == 1)\
    .select("CustomerID").distinct() 

# the product should be queso cabrales
customers_bought_queso = orders.join(products, on="ProductID", how="inner")\
    .filter(col("ProductName")=="Queso Cabrales")\
    .select(orders["CustomerID"]).distinct()
# customers have purchased only Queso Cabrales (per OrderID)
customers_bougth_one_product.join(customers_bought_queso, on="CustomerID", how="inner").count()

28

### 16. How many products are out of stock?

In [233]:
products.filter(col("UnitsInStock")==0).show()

+---------+--------------------+----------+----------+--------------------+---------+------------+------------+------------+------------+
|ProductID|         ProductName|SupplierID|CategoryID|     QuantityPerUnit|UnitPrice|UnitsInStock|UnitsOnOrder|ReorderLevel|Discontinued|
+---------+--------------------+----------+----------+--------------------+---------+------------+------------+------------+------------+
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|    21.35|           0|           0|           0|        true|
|       17|        Alice Mutton|         7|         6|      20 - 1 kg tins|     39.0|           0|           0|           0|        true|
|       29|Thüringer Rostbra...|        12|         6|50 bags x 30 sausgs.|   123.79|           0|           0|           0|        true|
|       31|   Gorgonzola Telino|        14|         4|     12 - 100 g pkgs|     12.5|           0|          70|          20|       false|
|       53|       Perth Pasties|  

### 17. How many products need to be restocked? (based on restock levels)

In [236]:
products.filter(col("UnitsInStock") <= col("ReorderLevel")).count()

22

### 18. How many products on order we need to restock?

In [238]:
products.filter((col("UnitsInStock") < col("UnitsOnOrder")) & (col("UnitsOnOrder") > 0)).count()

14

### 19. What is the stocked value of the discontinued products?

In [242]:
products.filter(col("Discontinued")==True)\
    .withColumn("StockedValue", col("UnitPrice") * col("UnitsInStock"))\
    .agg(sum("StockedValue").alias("TotalStockedValue")).show()

+-----------------+
|TotalStockedValue|
+-----------------+
|           4452.6|
+-----------------+



## 20. Which vendor has the highest stock value?

In [261]:
# select relevant columns
product_level = products.select("ProductID", "ProductName", "SupplierID", "UnitPrice", "UnitsInStock")

# get the stock value at the product level
product_stock_value = product_level.withColumn("StockValue", col("UnitPrice") * col("UnitsInStock"))

# transform the product_level data to supplier level data
highest_supplier = product_stock_value.groupBy("SupplierID").agg(sum("StockValue").alias("SupplierStockValue"))\
    .sort(desc("SupplierStockValue"))

highest_supplier.join(suppliers, on="SupplierID", how="left").show(1)

+----------+------------------+--------------------+--------------+-------------+--------------------+-----+------+----------+-------+---------------+---------------+--------+
|SupplierID|SupplierStockValue|         CompanyName|   ContactName| ContactTitle|             Address| City|Region|PostalCode|Country|          Phone|            Fax|HomePage|
+----------+------------------+--------------------+--------------+-------------+--------------------+-----+------+----------+-------+---------------+---------------+--------+
|        18|            5721.5|Aux joyeux ecclés...|Guylène Nodier|Sales Manager|203, Rue des Fran...|Paris|  null|     75004| France|(1) 03.83.00.68|(1) 03.83.00.62|    null|
+----------+------------------+--------------------+--------------+-------------+--------------------+-----+------+----------+-------+---------------+---------------+--------+
only showing top 1 row



### 21. How many employees are female?

In [264]:
employees.filter(col("Gender")=="Female").count()

4

### 22. How many employees are 60 years old or over?

In [283]:

employees.select("EmployeeID", to_date(col("BirthDate"), "dd-MM-yyyy").alias("BirthDate"))\
    .withColumn("CurrentDate", current_date())\
    .withColumn("Age", (round((datediff(col("CurrentDate"), col("BirthDate"))),2) / 365).cast("int"))\
    .where(col("Age").isNotNull())\
    .filter(col("Age") >= 60).count()


8

### 23. Which employee had the highest sales in 2022?

In [303]:
# filter the records where the order was placed in 2022
orders_2022 = orders.filter(year(to_date(col("OrderDate"), "dd-MM-yyyy")) == 2022)

# transform the order level data to employee level
highest_employee = orders_2022.groupBy("EmployeeID").agg(round(sum("_sales"),2).alias("SalesAmount"))\
    .sort(desc("SalesAmount"))

highest_employee.join(employees, on="EmployeeID", how='left').show(1)

+----------+-----------+--------+---------+--------------------+---------------+----------+----------+--------------------+-------+------+----------+-------+--------------+---------+--------------------+---------+--------------------+----------------+------+
|EmployeeID|SalesAmount|LastName|FirstName|               Title|TitleOfCourtesy| BirthDate|  HireDate|             Address|   City|Region|PostalCode|Country|     HomePhone|Extension|               Notes|ReportsTo|           PhotoPath|       Full Name|Gender|
+----------+-----------+--------+---------+--------------------+---------------+----------+----------+--------------------+-------+------+----------+-------+--------------+---------+--------------------+---------+--------------------+----------------+------+
|         4|    13230.8| Peacock| Margaret|Sales Representative|           Mrs.|19-09-1937|03-05-1993|4110 Old Redmond Rd.|Redmond|    WA|     98052|    USA|(206) 555-8122|     5176|Margaret holds a ...|        2|http://acc

### 24. How many employees sold over $100k in 2023?

In [309]:
# filter the records where the order was placed in 2023
order_2023 = orders.filter(year(to_date(col("OrderDate"), "dd-MM-yyyy")) == 2023)

order_2023.groupBy("EmployeeID").agg(round(sum("_sales"),0).alias("SalesAmount")).sort(desc("SalesAmount"))\
    .filter(col("SalesAmount") > 100000).show()

+----------+-----------+
|EmployeeID|SalesAmount|
+----------+-----------+
|         4|   127890.0|
+----------+-----------+



### 25. How many employees got hired in 1994?

In [311]:
employees.filter(year(to_date(col("HireDate"), "dd-MM-yyyy")) == 1994).count()

3