Dataset Name: **Northwind Traders Database**

Kaggle Link: https://www.kaggle.com/datasets/jealousleopard/northwind

Why this dataset?

- Contains 8 related tables (perfect for joins)
- Medium-sized dataset (realistic for practice)
- Classic relational database structure
- Includes customers, orders, products, employees, etc.

Dataset Tables:
Customers - Customer information

- `Orders` - Order headers
- `OrderDetails` - Order line items
- `Products` - Product information
- `Employees` - Employee data
- `Categories` - Product categories
- `Shippers` - Shipping companies

 ## Initialize Spark Session and Load Data

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("NorthwindAnalysis") \
    .getOrCreate()

In [25]:
# Load all tables
customers = spark.read.csv("customers.csv", header=True, inferSchema=True)
orders = spark.read.csv("orders.csv", header=True, inferSchema=True)
order_details = spark.read.csv("order_details.csv", header=True, inferSchema=True)
products = spark.read.csv("products.csv", header=True, inferSchema=True)
employees = spark.read.csv("employees.csv", header=True, inferSchema=True)
categories = spark.read.csv("categories.csv", header=True, inferSchema=True)
shippers = spark.read.csv("shippers.csv", header=True, inferSchema=True)
# suppliers = spark.read.csv("suppliers.csv", header=True, inferSchema=True)

## Transformation Examples with Northwind Data

 Basic Transformations

In [26]:
# Filter - German customers
german_customers = customers.filter(col("Country") == "Germany")

german_customers.show(5)

+----------+--------------------+-------------+--------------------+-----------+-------+
|customerID|         companyName|  contactName|        contactTitle|       city|country|
+----------+--------------------+-------------+--------------------+-----------+-------+
|     ALFKI| Alfreds Futterkiste| Maria Anders|Sales Representative|     Berlin|Germany|
|     BLAUS|Blauer See Delika...|   Hanna Moos|Sales Representative|   Mannheim|Germany|
|     DRACD|Drachenblut Delik...| Sven Ottlieb| Order Administrator|     Aachen|Germany|
|     FRANK|      Frankenversand|Peter Franken|   Marketing Manager|    M�nchen|Germany|
|     KOENE|     K�niglich Essen|Philip Cramer|     Sales Associate|Brandenburg|Germany|
+----------+--------------------+-------------+--------------------+-----------+-------+
only showing top 5 rows



In [27]:
# Select - Specific columns
customer_contact = customers.select("CustomerID", "CompanyName", "ContactName")

customer_contact.show(5)

+----------+--------------------+------------------+
|CustomerID|         CompanyName|       ContactName|
+----------+--------------------+------------------+
|     ALFKI| Alfreds Futterkiste|      Maria Anders|
|     ANATR|Ana Trujillo Empa...|      Ana Trujillo|
|     ANTON|Antonio Moreno Ta...|    Antonio Moreno|
|     AROUT|     Around the Horn|      Thomas Hardy|
|     BERGS|  Berglunds snabbk�p|Christina Berglund|
+----------+--------------------+------------------+
only showing top 5 rows



In [28]:
# WithColumn - Calculate order age
from pyspark.sql.functions import datediff, current_date
orders_with_age = orders.withColumn(
    "OrderAgeDays",
    datediff(current_date(), col("OrderDate"))
)

orders_with_age.show(5)

+-------+----------+----------+----------+------------+-----------+---------+-------+------------+
|orderID|customerID|employeeID| orderDate|requiredDate|shippedDate|shipperID|freight|OrderAgeDays|
+-------+----------+----------+----------+------------+-----------+---------+-------+------------+
|  10248|     VINET|         5|2013-07-04|  2013-08-01| 2013-07-16|        3|  32.38|        4415|
|  10249|     TOMSP|         6|2013-07-05|  2013-08-16| 2013-07-10|        1|  11.61|        4414|
|  10250|     HANAR|         4|2013-07-08|  2013-08-05| 2013-07-12|        2|  65.83|        4411|
|  10251|     VICTE|         3|2013-07-08|  2013-08-05| 2013-07-15|        1|  41.34|        4411|
|  10252|     SUPRD|         4|2013-07-09|  2013-08-06| 2013-07-11|        2|   51.3|        4410|
+-------+----------+----------+----------+------------+-----------+---------+-------+------------+
only showing top 5 rows



In [29]:
# Drop - Remove unnecessary columns
products_clean = products.drop("QuantityPerUnit")
products_clean.show(5)

+---------+--------------------+---------+------------+----------+
|productID|         productName|unitPrice|discontinued|categoryID|
+---------+--------------------+---------+------------+----------+
|        1|                Chai|     18.0|           0|         1|
|        2|               Chang|     19.0|           0|         1|
|        3|       Aniseed Syrup|     10.0|           0|         2|
|        4|Chef Anton's Caju...|     22.0|           0|         2|
|        5|Chef Anton's Gumb...|    21.35|           1|         2|
+---------+--------------------+---------+------------+----------+
only showing top 5 rows



Aggregations

In [30]:
# Product sales aggregation
product_sales = order_details.groupBy("ProductID") \
    .agg(
        sum("Quantity").alias("TotalUnitsSold"),
        sum(col("Quantity") * col("UnitPrice")).alias("TotalRevenue"),
        avg("UnitPrice").alias("AvgUnitPrice")
    )

product_sales.show(5)

+---------+--------------+------------------+------------------+
|ProductID|TotalUnitsSold|      TotalRevenue|      AvgUnitPrice|
+---------+--------------+------------------+------------------+
|       31|          1397|           16172.5|11.666666666666666|
|       65|           745|14606.999999999998|19.456249999999997|
|       53|           722|           21510.2|30.159999999999982|
|       34|           506|            6678.0|12.968421052631578|
|       28|           640|           26865.6|41.975757575757555|
+---------+--------------+------------------+------------------+
only showing top 5 rows



In [31]:
# Employee order count
employee_performance = orders.groupBy("EmployeeID") \
    .agg(
        count("OrderID").alias("OrderCount"),
        min("OrderDate").alias("FirstOrderDate"),
        max("OrderDate").alias("LastOrderDate")
    )

employee_performance.show(5)

+----------+----------+--------------+-------------+
|EmployeeID|OrderCount|FirstOrderDate|LastOrderDate|
+----------+----------+--------------+-------------+
|         1|       123|    2013-07-17|   2015-05-06|
|         6|        67|    2013-07-05|   2015-04-23|
|         3|       127|    2013-07-08|   2015-04-30|
|         5|        42|    2013-07-04|   2015-04-22|
|         9|        43|    2013-07-12|   2015-04-29|
+----------+----------+--------------+-------------+
only showing top 5 rows



Join Operations

In [32]:
# Complete order information
complete_orders = orders.join(order_details, "OrderID") \
                      .join(products.withColumnRenamed("unitPrice", "product_unitPrice"), "ProductID") \
                      .join(customers.withColumnRenamed("country", "customer_country"), "CustomerID") \
                      .join(employees.withColumnRenamed("city", "employee_city").withColumnRenamed("country", "employee_country"), "EmployeeID")

complete_orders.show(5)

+----------+----------+---------+-------+----------+------------+-----------+---------+-------+---------+--------+--------+--------------------+----------------+-----------------+------------+----------+--------------------+-------------+------------------+-------+----------------+---------------+--------------------+-------------+----------------+---------+
|employeeID|customerID|productID|orderID| orderDate|requiredDate|shippedDate|shipperID|freight|unitPrice|quantity|discount|         productName| quantityPerUnit|product_unitPrice|discontinued|categoryID|         companyName|  contactName|      contactTitle|   city|customer_country|   employeeName|               title|employee_city|employee_country|reportsTo|
+----------+----------+---------+-------+----------+------------+-----------+---------+-------+---------+--------+--------+--------------------+----------------+-----------------+------------+----------+--------------------+-------------+------------------+-------+-------------

In [33]:
# Products with category names
products_with_categories = products.join(categories, "CategoryID")

products_with_categories.show(5)

+----------+---------+--------------------+-------------------+---------+------------+------------+--------------------+
|categoryID|productID|         productName|    quantityPerUnit|unitPrice|discontinued|categoryName|         description|
+----------+---------+--------------------+-------------------+---------+------------+------------+--------------------+
|         1|        1|                Chai| 10 boxes x 20 bags|     18.0|           0|   Beverages|Soft drinks, coff...|
|         1|        2|               Chang| 24 - 12 oz bottles|     19.0|           0|   Beverages|Soft drinks, coff...|
|         2|        3|       Aniseed Syrup|12 - 550 ml bottles|     10.0|           0|  Condiments|Sweet and savory ...|
|         2|        4|Chef Anton's Caju...|     48 - 6 oz jars|     22.0|           0|  Condiments|Sweet and savory ...|
|         2|        5|Chef Anton's Gumb...|           36 boxes|    21.35|           1|  Condiments|Sweet and savory ...|
+----------+---------+----------

In [34]:
# Left join to find unsold products
unsold_products = products.join(
    order_details,
    "ProductID",
    "left"
).filter(col("OrderID").isNull())


unsold_products.show(5)

+---------+-----------+---------------+---------+------------+----------+-------+---------+--------+--------+
|productID|productName|quantityPerUnit|unitPrice|discontinued|categoryID|orderID|unitPrice|quantity|discount|
+---------+-----------+---------------+---------+------------+----------+-------+---------+--------+--------+
+---------+-----------+---------------+---------+------------+----------+-------+---------+--------+--------+



Window Functions

In [36]:
# Customer order ranking
customer_window = Window.partitionBy("CustomerID").orderBy(col("OrderDate").desc())
customer_orders_ranked = orders.withColumn(
    "OrderRank",
    rank().over(customer_window)
)

customer_orders_ranked.show(5)

+-------+----------+----------+----------+------------+-----------+---------+-------+---------+
|orderID|customerID|employeeID| orderDate|requiredDate|shippedDate|shipperID|freight|OrderRank|
+-------+----------+----------+----------+------------+-----------+---------+-------+---------+
|  11011|     ALFKI|         3|2015-04-09|  2015-05-07| 2015-04-13|        1|   1.21|        1|
|  10952|     ALFKI|         1|2015-03-16|  2015-04-27| 2015-03-24|        1|  40.42|        2|
|  10835|     ALFKI|         1|2015-01-15|  2015-02-12| 2015-01-21|        3|  69.53|        3|
|  10702|     ALFKI|         4|2014-10-13|  2014-11-24| 2014-10-21|        1|  23.94|        4|
|  10692|     ALFKI|         4|2014-10-03|  2014-10-31| 2014-10-13|        2|  61.02|        5|
+-------+----------+----------+----------+------------+-----------+---------+-------+---------+
only showing top 5 rows



In [37]:
# Monthly sales growth
monthly_sales = orders.join(order_details, "OrderID") \
    .groupBy(month("OrderDate").alias("Month")) \
    .agg(sum(col("Quantity") * col("UnitPrice")).alias("MonthlySales"))

sales_window = Window.orderBy("Month")
monthly_growth = monthly_sales.withColumn(
    "PrevMonthSales",
    lag("MonthlySales").over(sales_window)
).withColumn(
    "GrowthPct",
    (col("MonthlySales") - col("PrevMonthSales")) / col("PrevMonthSales") * 100
)

monthly_growth.show(5)

+-----+------------------+------------------+-------------------+
|Month|      MonthlySales|    PrevMonthSales|          GrowthPct|
+-----+------------------+------------------+-------------------+
|    1|         167547.52|              NULL|               NULL|
|    2|145769.15000000002|         167547.52|-12.998324296295143|
|    3|         149805.35|145769.15000000002| 2.7688986318435567|
|    4|         190329.95|         149805.35| 27.051503834809644|
|    5|          76722.36|         190329.95|-59.689812349554025|
+-----+------------------+------------------+-------------------+
only showing top 5 rows



## Action Examples

In [38]:
# Show results
complete_orders.show(5)

# Count records
print(f"Total customers: {customers.count()}")
print(f"Total orders: {orders.count()}")

# Collect top products to driver
top_products = product_sales.orderBy(col("TotalRevenue").desc()).take(5)

# Save results
complete_orders.write.mode("overwrite").parquet("output/complete_orders.parquet")
product_sales.write.mode("overwrite").csv("output/product_sales.csv", header=True)

+----------+----------+---------+-------+----------+------------+-----------+---------+-------+---------+--------+--------+--------------------+----------------+-----------------+------------+----------+--------------------+-------------+------------------+-------+----------------+---------------+--------------------+-------------+----------------+---------+
|employeeID|customerID|productID|orderID| orderDate|requiredDate|shippedDate|shipperID|freight|unitPrice|quantity|discount|         productName| quantityPerUnit|product_unitPrice|discontinued|categoryID|         companyName|  contactName|      contactTitle|   city|customer_country|   employeeName|               title|employee_city|employee_country|reportsTo|
+----------+----------+---------+-------+----------+------------+-----------+---------+-------+---------+--------+--------+--------------------+----------------+-----------------+------------+----------+--------------------+-------------+------------------+-------+-------------

## Complex Business Queries

Top 5 Customers by Revenue

In [39]:
top_customers = complete_orders.groupBy(
    "CustomerID", "CompanyName"
).agg(
    sum(col("Quantity") * col("UnitPrice")).alias("TotalSpent"),
    countDistinct("OrderID").alias("OrderCount")
).orderBy(
    col("TotalSpent").desc()
).limit(5)
top_customers.show()

+----------+--------------------+------------------+----------+
|CustomerID|         CompanyName|        TotalSpent|OrderCount|
+----------+--------------------+------------------+----------+
|     QUICK|          QUICK-Stop|         117483.39|        28|
|     SAVEA|  Save-a-lot Markets|         115673.39|        31|
|     ERNSH|        Ernst Handel|         113236.68|        30|
|     HUNGO|Hungry Owl All-Ni...|57317.390000000014|        19|
|     RATTC|Rattlesnake Canyo...|52245.899999999994|        18|
+----------+--------------------+------------------+----------+



Employee Sales Performance

In [40]:
employee_sales = complete_orders.groupBy(
    "EmployeeID",
    "employeeName"
).agg(
    sum(col("Quantity") * col("UnitPrice")).alias("TotalSales"),
    countDistinct("OrderID").alias("OrderCount"),
    avg(col("Quantity") * col("UnitPrice")).alias("AvgOrderValue")
).orderBy(
    col("TotalSales").desc()
)
employee_sales.show()

+----------+----------------+------------------+----------+-----------------+
|EmployeeID|    employeeName|        TotalSales|OrderCount|    AvgOrderValue|
+----------+----------------+------------------+----------+-----------------+
|         4|Margaret Peacock|250187.44999999998|       156|595.6844047619047|
|         3| Janet Leverling|          213051.3|       127| 663.711214953271|
|         1|   Nancy Davolio|202143.71000000002|       123|585.9237971014493|
|         2|   Andrew Fuller|         177749.26|        96| 737.548796680498|
|         7|     Robert King|141295.99000000002|        72|802.8181250000001|
|         8|  Laura Callahan|         133301.03|       104|512.6962692307692|
|         9|  Anne Dodsworth| 82963.99999999999|        43|775.3644859813082|
|         6|  Michael Suyama| 78198.09999999999|        67|465.4648809523809|
|         5| Steven Buchanan|          75567.75|        42|645.8782051282051|
+----------+----------------+------------------+----------+-----

Product Category Analysis

In [41]:
category_performance = products_with_categories.join(
    order_details.withColumnRenamed("unitPrice", "order_unitPrice"), "ProductID"
).groupBy(
    "CategoryID", "CategoryName"
).agg(
    sum(col("Quantity") * col("order_unitPrice")).alias("CategoryRevenue"),
    avg(col("order_unitPrice")).alias("AvgProductPrice"),
    countDistinct("ProductID").alias("ProductCount")
)
category_performance.show()

+----------+----------------+------------------+------------------+------------+
|CategoryID|    CategoryName|   CategoryRevenue|   AvgProductPrice|ProductCount|
+----------+----------------+------------------+------------------+------------+
|         7|         Produce|105268.59999999998| 35.19448529411767|           5|
|         4|  Dairy Products|          251330.5|26.983060109289614|          10|
|         8|         Seafood|         141623.09|  19.0629696969697|          12|
|         1|       Beverages|286526.94999999995|29.236757425742578|          12|
|         5|Grains & Cereals|          100726.8| 21.24642857142858|           7|
|         3|     Confections|          177099.1| 22.60269461077844|          13|
|         6|  Meat & Poultry|178188.80000000002|42.874739884393065|           6|
|         2|      Condiments|         113694.75| 21.32083333333333|          12|
+----------+----------------+------------------+------------------+------------+



---

This notebook demonstrates various PySpark transformations and actions applied to the Northwind Traders database. The goal is to showcase common data manipulation techniques using Spark DataFrames.

The dataset tables used include:
- `Customers`
- `Orders`
- `OrderDetails`
- `Products`
- `Employees`
- `Categories`
- `Shippers`

### PySpark Transformations

Transformations are lazy operations that define the data manipulation logic. They do not execute immediately but build a plan that is executed when an action is called.

---

#### Filtering Data

Filtering selects rows based on a condition. Below is an example of filtering the `customers` DataFrame to get only customers from Germany.


#### Selecting Columns

- Selecting chooses specific columns from a DataFrame. Here, we select the CustomerID, CompanyName, and ContactName from the `customers` DataFrame.



#### Adding New Columns (`withColumn`)

- `withColumn` is used to add a new column to a DataFrame or replace an existing one. In this example, we calculate the age of each order in days.



#### Dropping Columns (`drop`)

- The `drop` transformation removes specified columns from a DataFrame. Here, we remove the `QuantityPerUnit` column from the `products` DataFrame.

---

#### Aggregations (`groupBy` and `agg`)

Aggregations are used to group data by one or more columns and then perform aggregate functions (like sum, count, average, min, max) on other columns.

Below is an example of calculating total units sold, total revenue, and average unit price for each product.



Here, we aggregate the `orders` DataFrame to find the order count, first order date, and last order date for each employee.

---

#### Join Operations

Joins combine data from two or more DataFrames based on related columns. Different types of joins exist, such as inner, outer, left, and right joins.

This example demonstrates joining multiple tables (`orders`, `order_details`, `products`, `customers`, `employees`) to create a comprehensive view of orders.

This code joins `products` with `categories` to add category names to the product information.

This example uses a left join to identify products that have not been sold by checking for null OrderID values in the joined result.

---


#### Window Functions

Window functions perform calculations across a set of DataFrame rows that are related to the current row. They are used for tasks like ranking, calculating moving averages, and accessing previous or subsequent rows.

This example uses a window function to rank orders for each customer based on the order date.

This code calculates the monthly sales and then uses a window function (`lag`) to determine the previous month's sales and calculate the monthly growth percentage.

---

### PySpark Actions

Actions are operations that trigger the execution of the transformations plan and return a result to the driver program or write data to storage.

#### Displaying Results (`show`)

- The `show()` action displays the top rows of a DataFrame.

#### Counting Records (`count`)

- The `count()` action returns the number of rows in a DataFrame.

#### Collecting Data (`collect`, `take`)

- Actions like `collect()` and `take()` return data from the DataFrame to the driver program. `collect()` brings all data (use with caution on large datasets), while `take(n)` brings the first `n` rows. Here, we collect the top 5 products by revenue.

#### Saving Data (`write`)

- The `write` action saves the contents of a DataFrame to various data sources (e.g., Parquet, CSV, JSON). The `mode("overwrite")` option is used here to replace the file if it already exists.